From f61eed1a6754268e5f5ca937dc9b7919e58d4675 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Thu, 8 Feb 2018 10:51:01 +0100 Subject: [PATCH] Client refactor: move part of `Lwt_utils` in `Lwt_utils_unix` --- src/bin_node/node_config_file.ml | 2 +- src/bin_node/node_run_command.ml | 2 +- src/lib_base/protocol.ml | 8 +- src/lib_client_base/client_commands.ml | 4 +- src/lib_p2p/p2p_io_scheduler.ml | 12 +- src/lib_p2p/p2p_maintenance.ml | 6 +- src/lib_p2p/p2p_pool.ml | 22 +-- src/lib_p2p/p2p_socket.ml | 18 +-- src/lib_p2p/p2p_welcome.ml | 6 +- src/lib_p2p/test/test_p2p_io_scheduler.ml | 2 +- src/lib_p2p/test/test_p2p_pool.ml | 8 +- src/lib_p2p/test/test_p2p_socket.ml | 2 +- src/lib_protocol_compiler/compiler.ml | 6 +- src/lib_shell/block_validator.ml | 4 +- src/lib_shell/bootstrap_pipeline.ml | 20 +-- src/lib_shell/distributed_db.ml | 2 +- src/lib_shell/protocol_validator.ml | 4 +- src/lib_shell/worker.ml | 6 +- src/lib_stdlib_lwt/lwt_lock_file.ml | 4 +- src/lib_stdlib_lwt/lwt_utils.ml | 154 -------------------- src/lib_stdlib_lwt/lwt_utils.mli | 33 ----- src/lib_stdlib_lwt/lwt_utils_unix.ml | 168 ++++++++++++++++++++++ src/lib_stdlib_lwt/lwt_utils_unix.mli | 45 ++++++ 23 files changed, 282 insertions(+), 256 deletions(-) create mode 100644 src/lib_stdlib_lwt/lwt_utils_unix.ml create mode 100644 src/lib_stdlib_lwt/lwt_utils_unix.mli diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index ddfd97827..93fbef514 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -537,7 +537,7 @@ let resolve_addr ?default_port ?(passive = false) peer = invalid_arg "" | "", Some default_port -> string_of_int default_port | port, _ -> port in - Lwt_utils.getaddrinfo ~passive ~node ~service + Lwt_utils_unix.getaddrinfo ~passive ~node ~service let resolve_addrs ?default_port ?passive peers = Lwt_list.fold_left_s begin fun a peer -> diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 8647e2f81..7c1cd6e02 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -93,7 +93,7 @@ let init_logger ?verbosity (log_config : Node_config_file.log) = let init_node ?sandbox (config : Node_config_file.t) = let patch_context json ctxt = let module Proto = (val Registred_protocol.get_exn genesis.protocol) in - Lwt_utils.protect begin fun () -> + Lwt_utils_unix.protect begin fun () -> Proto.configure_sandbox ctxt json end >|= function | Error err -> diff --git a/src/lib_base/protocol.ml b/src/lib_base/protocol.ml index 9eb1d344c..2c7a16901 100644 --- a/src/lib_base/protocol.ml +++ b/src/lib_base/protocol.ml @@ -168,18 +168,18 @@ let read_dir dir = open Lwt.Infix let create_files dir units = - Lwt_utils.remove_dir dir >>= fun () -> - Lwt_utils.create_dir dir >>= fun () -> + Lwt_utils_unix.remove_dir dir >>= fun () -> + Lwt_utils_unix.create_dir dir >>= fun () -> Lwt_list.map_s (fun { name ; interface ; implementation } -> let name = String.lowercase_ascii name in let ml = dir // (name ^ ".ml") in let mli = dir // (name ^ ".mli") in - Lwt_utils.create_file ml implementation >>= fun () -> + Lwt_utils_unix.create_file ml implementation >>= fun () -> match interface with | None -> Lwt.return [ml] | Some content -> - Lwt_utils.create_file mli content >>= fun () -> + Lwt_utils_unix.create_file mli content >>= fun () -> Lwt.return [ mli ; ml ]) units >>= fun files -> let files = List.concat files in diff --git a/src/lib_client_base/client_commands.ml b/src/lib_client_base/client_commands.ml index b8ef47dde..725d21327 100644 --- a/src/lib_client_base/client_commands.ml +++ b/src/lib_client_base/client_commands.ml @@ -91,7 +91,7 @@ class file_wallet dir : wallet = object (self) fun alias_name list encoding -> Lwt.catch (fun () -> - Lwt_utils.create_dir dir >>= fun () -> + Lwt_utils_unix.create_dir dir >>= fun () -> let filename = self#filename alias_name in let json = Data_encoding.Json.construct encoding list in Data_encoding_ezjsonm.write_file filename json) @@ -127,7 +127,7 @@ let default_log ~base_dir channel msg = Lwt.return () | log -> let (//) = Filename.concat in - Lwt_utils.create_dir (base_dir // "logs" // log) >>= fun () -> + Lwt_utils_unix.create_dir (base_dir // "logs" // log) >>= fun () -> Lwt_io.with_file ~flags: Unix.[ O_APPEND ; O_CREAT ; O_WRONLY ] ~mode: Lwt_io.Output diff --git a/src/lib_p2p/p2p_io_scheduler.ml b/src/lib_p2p/p2p_io_scheduler.ml index 7a2b09b4c..41d0d248f 100644 --- a/src/lib_p2p/p2p_io_scheduler.ml +++ b/src/lib_p2p/p2p_io_scheduler.ml @@ -120,7 +120,7 @@ module Scheduler(IO : IO) = struct false, (Queue.pop st.readys_low) in match msg with - | Error [Lwt_utils.Canceled] -> + | Error [ Lwt_utils_unix.Canceled ] -> worker_loop st | Error ([Connection_closed | Exn ( Lwt_pipe.Closed | @@ -140,7 +140,7 @@ module Scheduler(IO : IO) = struct conn.current_push <- begin IO.push conn.out_param msg >>= function | Ok () - | Error [Lwt_utils.Canceled] -> + | Error [ Lwt_utils_unix.Canceled ] -> return () | Error ([Connection_closed | Exn (Unix.Unix_error (EBADF, _, _) | @@ -264,7 +264,7 @@ module WriteScheduler = Scheduler(struct let push fd buf = Lwt.catch (fun () -> - Lwt_utils.write_mbytes fd buf >>= return) + Lwt_utils_unix.write_mbytes fd buf >>= return) (function | Unix.Unix_error(Unix.ECONNRESET, _, _) | Unix.Unix_error(Unix.EPIPE, _, _) @@ -357,7 +357,7 @@ let register = let cpt = ref 0 in fun st conn -> if st.closed then begin - Lwt.async (fun () -> Lwt_utils.safe_close conn) ; + Lwt.async (fun () -> Lwt_utils_unix.safe_close conn) ; raise Closed end else begin let id = incr cpt; !cpt in @@ -380,7 +380,7 @@ let register = Moving_average.destroy write_conn.counter ; Lwt_pipe.close write_queue ; Lwt_pipe.close read_queue ; - Lwt_utils.safe_close conn + Lwt_utils_unix.safe_close conn end ; let conn = { sched = st ; id ; conn ; canceler ; @@ -481,7 +481,7 @@ let close ?timeout conn = | None -> return (Lwt_canceler.cancelation conn.canceler) | Some timeout -> - Lwt_utils.with_timeout + Lwt_utils_unix.with_timeout ~canceler:conn.canceler timeout begin fun canceler -> return (Lwt_canceler.cancelation canceler) end diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index 56d0f327f..95946b54c 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -119,7 +119,7 @@ and too_few_connections st n_connected = discover the local network and then wait *) Option.iter ~f:P2p_discovery.restart st.disco ; P2p_pool.broadcast_bootstrap_msg pool ; - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Lwt.pick [ P2p_pool.Pool_event.wait_new_peer pool ; Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? @@ -146,7 +146,7 @@ and too_many_connections st n_connected = let rec worker_loop st = let Pool pool = st.pool in begin - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Lwt.pick [ Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_condition.wait st.please_maintain ; (* when asked *) @@ -165,7 +165,7 @@ let rec worker_loop st = end end >>= function | Ok () -> worker_loop st - | Error [Lwt_utils.Canceled] -> Lwt.return_unit + | Error [ Lwt_utils_unix.Canceled ] -> Lwt.return_unit | Error _ -> Lwt.return_unit let run ~connection_timeout bounds pool disco = diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 92d689f52..9a880324e 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -87,7 +87,7 @@ module Answerer = struct let rec worker_loop st = Lwt_unix.yield () >>= fun () -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> P2p_socket.read st.conn end >>= function | Ok (_, Bootstrap) -> begin @@ -122,7 +122,7 @@ module Answerer = struct (* TODO: Penalize peer... *) Lwt_canceler.cancel st.canceler >>= fun () -> Lwt.return_unit - | Error [Lwt_utils.Canceled] -> + | Error [Lwt_utils_unix.Canceled] -> Lwt.return_unit | Error err -> lwt_log_error "@[Answerer unexpected error:@ %a@]" @@ -568,7 +568,7 @@ let rec connect ~timeout pool point = (active_connections pool <= pool.config.max_connections) Too_many_connections >>=? fun () -> let canceler = Lwt_canceler.create () in - Lwt_utils.with_timeout ~canceler timeout begin fun canceler -> + Lwt_utils_unix.with_timeout ~canceler timeout begin fun canceler -> let point_info = register_point pool pool.config.identity.peer_id point in let addr, port as point = P2p_point_state.Info.point point_info in @@ -581,14 +581,14 @@ let rec connect ~timeout pool point = let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () -> - Lwt_utils.protect ~canceler begin fun () -> + Lwt_utils_unix.protect ~canceler begin fun () -> log pool (Outgoing_connection point) ; Lwt_unix.connect fd uaddr >>= fun () -> return () end ~on_error: begin fun err -> lwt_debug "connect: %a -> disconnect" P2p_point.Id.pp point >>= fun () -> P2p_point_state.set_disconnected point_info ; - Lwt_utils.safe_close fd >>= fun () -> + Lwt_utils_unix.safe_close fd >>= fun () -> match err with | [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] -> fail Connection_refused @@ -603,7 +603,7 @@ and authenticate pool ?point_info canceler fd point = lwt_debug "authenticate: %a%s" P2p_point.Id.pp point (if incoming then " incoming" else "") >>= fun () -> - Lwt_utils.protect ~canceler begin fun () -> + Lwt_utils_unix.protect ~canceler begin fun () -> P2p_socket.authenticate ~proof_of_work_target:pool.config.proof_of_work_target ~incoming (P2p_io_scheduler.register pool.io_sched fd) point @@ -612,7 +612,7 @@ and authenticate pool ?point_info canceler fd point = end ~on_error: begin fun err -> (* TODO do something when the error is Not_enough_proof_of_work ?? *) begin match err with - | [ Lwt_utils.Canceled ] -> + | [ Lwt_utils_unix.Canceled ] -> (* Currently only on time out *) lwt_debug "authenticate: %a%s -> canceled" P2p_point.Id.pp point @@ -682,7 +682,7 @@ and authenticate pool ?point_info canceler fd point = lwt_debug "authenticate: %a -> accept %a" P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> - Lwt_utils.protect ~canceler begin fun () -> + Lwt_utils_unix.protect ~canceler begin fun () -> P2p_socket.accept ?incoming_message_queue_size:pool.config.incoming_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size @@ -881,7 +881,7 @@ and swap pool conn current_peer_id new_point = pool.latest_accepted_swap <- pool.latest_succesfull_swap ; log pool (Swap_failure { source = source_peer_id }) ; match err with - | [ Lwt_utils.Timeout ] -> + | [ Lwt_utils_unix.Timeout ] -> lwt_debug "Swap to %a was interupted: %a" P2p_point.Id.pp new_point pp_print_error err | _ -> @@ -893,12 +893,12 @@ let accept pool fd point = log pool (Incoming_connection point) ; if pool.config.max_incoming_connections <= P2p_point.Table.length pool.incoming || pool.config.max_connections <= active_connections pool then - Lwt.async (fun () -> Lwt_utils.safe_close fd) + Lwt.async (fun () -> Lwt_utils_unix.safe_close fd) else let canceler = Lwt_canceler.create () in P2p_point.Table.add pool.incoming point canceler ; Lwt.async begin fun () -> - Lwt_utils.with_timeout + Lwt_utils_unix.with_timeout ~canceler pool.config.authentification_timeout (fun canceler -> authenticate pool canceler fd point) end diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index bd663e69b..29e105f43 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -249,7 +249,7 @@ module Reader = struct lwt_debug "[read_message] incremental decoding error" >>= fun () -> return None | Await decode_next_buf -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Crypto.read_chunk st.conn.fd st.conn.cryptobox_data end >>=? fun buf -> lwt_debug @@ -265,12 +265,12 @@ module Reader = struct read_message st init_mbytes >>=? fun msg -> match msg with | None -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> return None end | Some (msg, size, rem_mbytes) -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> return (Some rem_mbytes) end @@ -280,7 +280,7 @@ module Reader = struct | Ok None -> Lwt_canceler.cancel st.canceler >>= fun () -> Lwt.return_unit - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" P2p_connection.Info.pp st.conn.info >>= fun () -> Lwt.return_unit @@ -331,7 +331,7 @@ module Writer = struct let rec loop = function | [] -> return () | buf :: l -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf end >>=? fun () -> lwt_debug "writing %d bytes to %a" @@ -345,10 +345,10 @@ module Writer = struct let rec worker_loop st = Lwt_unix.yield () >>= fun () -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Lwt_pipe.pop st.messages >>= return end >>= function - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" P2p_connection.Info.pp st.conn.info >>= fun () -> Lwt.return_unit @@ -370,7 +370,7 @@ module Writer = struct Lwt.wakeup_later u (Error [P2p_io_scheduler.Connection_closed])) ; match err with - | [ Lwt_utils.Canceled | Exn Lwt_pipe.Closed ] -> + | [ Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed ] -> lwt_debug "connection closed to %a" P2p_connection.Info.pp st.conn.info >>= fun () -> Lwt.return_unit @@ -453,7 +453,7 @@ let info { conn } = conn.info let accept ?incoming_message_queue_size ?outgoing_message_queue_size ?binary_chunks_size (fd, info, cryptobox_data) encoding = - Lwt_utils.protect begin fun () -> + Lwt_utils_unix.protect begin fun () -> Ack.write fd cryptobox_data Ack >>=? fun () -> Ack.read fd cryptobox_data end ~on_error:begin fun err -> diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index 1c357fd2d..a0b541591 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -21,7 +21,7 @@ type t = { let rec worker_loop st = let Pool pool = st.pool in Lwt_unix.yield () >>= fun () -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> Lwt_unix.accept st.socket >>= return end >>= function | Ok (fd, addr) -> @@ -32,7 +32,7 @@ let rec worker_loop st = (Ipaddr_unix.V6.of_inet_addr_exn addr, port) in P2p_pool.accept pool fd point ; worker_loop st - | Error [Lwt_utils.Canceled] -> + | Error [Lwt_utils_unix.Canceled] -> Lwt.return_unit | Error err -> lwt_log_error "@[Unexpected error in the Welcome worker@ %a@]" @@ -53,7 +53,7 @@ let run ~backlog pool ?addr port = ~backlog ?addr port >>= fun socket -> let canceler = Lwt_canceler.create () in Lwt_canceler.on_cancel canceler begin fun () -> - Lwt_utils.safe_close socket + Lwt_utils_unix.safe_close socket end ; let st = { socket ; canceler ; pool = Pool pool ; diff --git a/src/lib_p2p/test/test_p2p_io_scheduler.ml b/src/lib_p2p/test/test_p2p_io_scheduler.ml index c872a93ca..d0d5299fe 100644 --- a/src/lib_p2p/test/test_p2p_io_scheduler.ml +++ b/src/lib_p2p/test/test_p2p_io_scheduler.ml @@ -151,7 +151,7 @@ let run let client n = let prefix = Printf.sprintf "client(%d): " n in Process.detach ~prefix begin fun _ -> - Lwt_utils.safe_close main_socket >>= fun () -> + Lwt_utils_unix.safe_close main_socket >>= fun () -> client ?max_upload_speed ?write_queue_size addr port time n end in Lwt_list.map_p client (1 -- n) >>= fun client_nodes -> diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index 2f1ae3ff5..b9ff90c0d 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -122,8 +122,8 @@ module Simple = struct | Error ([ P2p_pool.Connection_refused | P2p_pool.Pending_connection | P2p_socket.Rejected - | Lwt_utils.Canceled - | Lwt_utils.Timeout + | Lwt_utils_unix.Canceled + | Lwt_utils_unix.Timeout | P2p_pool.Rejected _ as err ]) -> lwt_log_info "Connection to %a failed (%a)" P2p_point.Id.pp point @@ -134,9 +134,9 @@ module Simple = struct Format.fprintf ppf "pending connection" | P2p_socket.Rejected -> Format.fprintf ppf "rejected" - | Lwt_utils.Canceled -> + | Lwt_utils_unix.Canceled -> Format.fprintf ppf "canceled" - | Lwt_utils.Timeout -> + | Lwt_utils_unix.Timeout -> Format.fprintf ppf "timeout" | P2p_pool.Rejected peer -> Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index 8173e8275..d161f37f8 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -76,7 +76,7 @@ let run_nodes client server = return () end >>= fun server_node -> Process.detach ~prefix:"client: " begin fun channel -> - Lwt_utils.safe_close main_socket >>= fun () -> + Lwt_utils_unix.safe_close main_socket >>= fun () -> let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in client channel sched default_addr port >>=? fun () -> P2p_io_scheduler.shutdown sched >>= fun () -> diff --git a/src/lib_protocol_compiler/compiler.ml b/src/lib_protocol_compiler/compiler.ml index 9cf1e7177..a847aea00 100644 --- a/src/lib_protocol_compiler/compiler.ml +++ b/src/lib_protocol_compiler/compiler.ml @@ -174,12 +174,12 @@ let main { compile_ml ; pack_objects ; link_shared } = match !build_dir with | None -> let dir = mktemp_dir () in - at_exit (fun () -> Lwt_main.run (Lwt_utils.remove_dir dir)) ; + at_exit (fun () -> Lwt_main.run (Lwt_utils_unix.remove_dir dir)) ; dir | Some dir -> dir in - Lwt_main.run (Lwt_utils.create_dir ~perm:0o755 build_dir) ; - Lwt_main.run (Lwt_utils.create_dir ~perm:0o755 (Filename.dirname output)) ; let hash, protocol = Protocol.read_dir source_dir in + Lwt_main.run (Lwt_utils_unix.create_dir ~perm:0o755 build_dir) ; + Lwt_main.run (Lwt_utils_unix.create_dir ~perm:0o755 (Filename.dirname output)) ; (* Generate the 'functor' *) let functor_file = build_dir // "functor.ml" in let oc = open_out functor_file in diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index 0052ac47f..268b1f5ad 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -241,7 +241,7 @@ let on_request net_state header.shell.predecessor >>=? fun pred -> get_proto pred hash >>=? fun proto -> (* TODO also protect with [Worker.canceler w]. *) - Lwt_utils.protect ?canceler begin fun () -> + Lwt_utils_unix.protect ?canceler begin fun () -> apply_block (Distributed_db.net_state net_db) pred proto hash header operations @@ -264,7 +264,7 @@ let on_request end (* TODO catch other temporary error (e.g. system errors) and do not 'commit' them on disk... *) - | Error [Lwt_utils.Canceled | Unavailable_protocol _] as err -> + | Error [Lwt_utils_unix.Canceled | Unavailable_protocol _] as err -> return err | Error errors -> Worker.protect w begin fun () -> diff --git a/src/lib_shell/bootstrap_pipeline.ml b/src/lib_shell/bootstrap_pipeline.ml index bc1c9feb4..215a2bad0 100644 --- a/src/lib_shell/bootstrap_pipeline.ml +++ b/src/lib_shell/bootstrap_pipeline.ml @@ -55,7 +55,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) = lwt_debug "fetching block header %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Distributed_db.Block_header.fetch ~timeout:pipeline.block_header_timeout pipeline.net_db ~peer:pipeline.peer_id @@ -69,7 +69,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) = fetch_loop [] step.block step.step >>=? fun headers -> iter_s begin fun header -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.push pipeline.fetched_headers header >>= return end end @@ -87,7 +87,7 @@ let headers_fetch_worker_loop pipeline = P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> Lwt_pipe.close pipeline.fetched_headers ; Lwt.return_unit - | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> Lwt.return_unit | Error [ Distributed_db.Block_header.Timeout bh ] -> lwt_log_info "request for header %a from peer %a timed out." @@ -105,7 +105,7 @@ let headers_fetch_worker_loop pipeline = let rec operations_fetch_worker_loop pipeline = begin Lwt_unix.yield () >>= fun () -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.pop pipeline.fetched_headers >>= return end >>=? fun (hash, header) -> lwt_log_info "fetching operations of block %a from peer %a." @@ -113,7 +113,7 @@ let rec operations_fetch_worker_loop pipeline = P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> map_p (fun i -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Distributed_db.Operations.fetch ~timeout:pipeline.block_operations_timeout pipeline.net_db ~peer:pipeline.peer_id @@ -123,14 +123,14 @@ let rec operations_fetch_worker_loop pipeline = lwt_log_info "fetched operations of block %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.push pipeline.fetched_blocks (hash, header, operations) >>= return end end >>= function | Ok () -> operations_fetch_worker_loop pipeline - | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> Lwt_pipe.close pipeline.fetched_blocks ; Lwt.return_unit | Error [ Distributed_db.Operations.Timeout (bh, n) ] -> @@ -149,13 +149,13 @@ let rec operations_fetch_worker_loop pipeline = let rec validation_worker_loop pipeline = begin Lwt_unix.yield () >>= fun () -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.pop pipeline.fetched_blocks >>= return end >>=? fun (hash, header, operations) -> lwt_log_info "requesting validation for block %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> Block_validator.validate ~canceler:pipeline.canceler ~notify_new_block:pipeline.notify_new_block @@ -168,7 +168,7 @@ let rec validation_worker_loop pipeline = return () end >>= function | Ok () -> validation_worker_loop pipeline - | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> Lwt.return_unit | Error ([ Block_validator_errors.Invalid_block _ | Block_validator_errors.Unavailable_protocol _ ] as err ) -> diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index 2f806d923..d406e00d4 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -629,7 +629,7 @@ module P2p_reader = struct end let rec worker_loop global_db state = - Lwt_utils.protect ~canceler:state.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:state.canceler begin fun () -> P2p.recv global_db.p2p state.conn end >>= function | Ok msg -> diff --git a/src/lib_shell/protocol_validator.ml b/src/lib_shell/protocol_validator.ml index b1f3f804d..9fbb78cd7 100644 --- a/src/lib_shell/protocol_validator.ml +++ b/src/lib_shell/protocol_validator.ml @@ -78,7 +78,7 @@ let () = let rec worker_loop bv = begin - Lwt_utils.protect ~canceler:bv.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:bv.canceler begin fun () -> Lwt_pipe.pop bv.messages >>= return end >>=? function Message (request, wakener) -> match request with @@ -115,7 +115,7 @@ let rec worker_loop bv = end >>= function | Ok () -> worker_loop bv - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> lwt_log_notice "terminating" >>= fun () -> Lwt.return_unit | Error err -> diff --git a/src/lib_shell/worker.ml b/src/lib_shell/worker.ml index c2ecbe266..3ed893db5 100644 --- a/src/lib_shell/worker.ml +++ b/src/lib_shell/worker.ml @@ -202,7 +202,7 @@ module Make Lwt.ignore_result (Lwt_canceler.cancel w.canceler) let protect { canceler } ?on_error f = - Lwt_utils.protect ?on_error ~canceler f + Lwt_utils_unix.protect ?on_error ~canceler f let canceler { canceler } = canceler @@ -270,7 +270,7 @@ module Make Lwt.return_unit in let rec loop () = begin - Lwt_utils.protect ~canceler:w.canceler begin fun () -> + Lwt_utils_unix.protect ~canceler:w.canceler begin fun () -> pop w end >>=? function | None -> Handlers.on_no_request w @@ -301,7 +301,7 @@ module Make end >>= function | Ok () -> loop () - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] -> + | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] -> Logger.lwt_log_info "[%a] worker terminated" Name.pp w.name >>= fun () -> diff --git a/src/lib_stdlib_lwt/lwt_lock_file.ml b/src/lib_stdlib_lwt/lwt_lock_file.ml index 7f1356503..7f86b50b7 100644 --- a/src/lib_stdlib_lwt/lwt_lock_file.ml +++ b/src/lib_stdlib_lwt/lwt_lock_file.ml @@ -34,7 +34,7 @@ let blocking_create create_inner Unix.F_LOCK ~close_on_exec ~unlink_on_exit fn in match timeout with | None -> create () - | Some duration -> Lwt_utils.with_timeout duration (fun _ -> create ()) + | Some duration -> Lwt_utils_unix.with_timeout duration (fun _ -> create ()) let is_locked fn = if not @@ Sys.file_exists fn then return false else @@ -50,7 +50,7 @@ let is_locked fn = let get_pid fn = let open Lwt_io in - Lwt_utils.protect begin fun () -> + Lwt_utils_unix.protect begin fun () -> with_file ~mode:Input fn begin fun ic -> read ic >>= fun content -> return (int_of_string content) diff --git a/src/lib_stdlib_lwt/lwt_utils.ml b/src/lib_stdlib_lwt/lwt_utils.ml index 17eab7e4c..f6ca5cd58 100644 --- a/src/lib_stdlib_lwt/lwt_utils.ml +++ b/src/lib_stdlib_lwt/lwt_utils.ml @@ -161,161 +161,7 @@ let stable_sort cmp l = let sort = stable_sort -let read_bytes ?(pos = 0) ?len fd buf = - let len = match len with None -> Bytes.length buf - pos | Some l -> l in - let rec inner pos len = - if len = 0 then - Lwt.return_unit - else - Lwt_unix.read fd buf pos len >>= function - | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) - | nb_read -> inner (pos + nb_read) (len - nb_read) - in - inner pos len - -let read_mbytes ?(pos=0) ?len fd buf = - let len = match len with None -> MBytes.length buf - pos | Some l -> l in - let rec inner pos len = - if len = 0 then - Lwt.return_unit - else - Lwt_bytes.read fd buf pos len >>= function - | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) - | nb_read -> inner (pos + nb_read) (len - nb_read) - in - inner pos len - -let write_mbytes ?(pos=0) ?len descr buf = - let len = match len with None -> MBytes.length buf - pos | Some l -> l in - let rec inner pos len = - if len = 0 then - Lwt.return_unit - else - Lwt_bytes.write descr buf pos len >>= function - | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) - | nb_written -> inner (pos + nb_written) (len - nb_written) in - inner pos len - -let write_bytes ?(pos=0) ?len descr buf = - let len = match len with None -> Bytes.length buf - pos | Some l -> l in - let rec inner pos len = - if len = 0 then - Lwt.return_unit - else - Lwt_unix.write descr buf pos len >>= function - | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) - | nb_written -> inner (pos + nb_written) (len - nb_written) in - inner pos len - -let (>>=) = Lwt.bind - -let remove_dir dir = - let rec remove dir = - let files = Lwt_unix.files_of_directory dir in - Lwt_stream.iter_s - (fun file -> - if file = "." || file = ".." then - Lwt.return () - else begin - let file = Filename.concat dir file in - if Sys.is_directory file - then remove file - else Lwt_unix.unlink file - end) - files >>= fun () -> - Lwt_unix.rmdir dir in - if Sys.file_exists dir && Sys.is_directory dir then - remove dir - else - Lwt.return () - -let rec create_dir ?(perm = 0o755) dir = - Lwt_unix.file_exists dir >>= function - | false -> - create_dir (Filename.dirname dir) >>= fun () -> - Lwt_unix.mkdir dir perm - | true -> - Lwt_unix.stat dir >>= function - | { st_kind = S_DIR ; _ } -> Lwt.return_unit - | _ -> failwith "Not a directory" - -let create_file ?(perm = 0o644) name content = - Lwt_unix.openfile name Unix.([O_TRUNC; O_CREAT; O_WRONLY]) perm >>= fun fd -> - Lwt_unix.write_string fd content 0 (String.length content) >>= fun _ -> - Lwt_unix.close fd - -let safe_close fd = - Lwt.catch - (fun () -> Lwt_unix.close fd) - (fun _ -> Lwt.return_unit) - -open Error_monad - -type error += Canceled - -let protect ?on_error ?canceler t = - let cancelation = - match canceler with - | None -> never_ending - | Some canceler -> - (Lwt_canceler.cancelation canceler >>= fun () -> - fail Canceled ) in - let res = - Lwt.pick [ cancelation ; - Lwt.catch t (fun exn -> fail (Exn exn)) ] in - res >>= function - | Ok _ -> res - | Error err -> - let canceled = - Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in - let err = if canceled then [Canceled] else err in - match on_error with - | None -> Lwt.return (Error err) - | Some on_error -> - Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn)) - -type error += Timeout - -let () = - Error_monad.register_error_kind - `Temporary - ~id:"utils.Timeout" - ~title:"Timeout" - ~description:"Timeout" - Data_encoding.unit - (function Timeout -> Some () | _ -> None) - (fun () -> Timeout) - -let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f = - let timeout = Lwt_unix.sleep timeout in - let target = f canceler in - Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () -> - Lwt_unix.yield () >>= fun () -> - if Lwt.state target <> Lwt.Sleep then begin - Lwt.cancel timeout ; - target - end else begin - Lwt_canceler.cancel canceler >>= fun () -> - fail Timeout - end - let unless cond f = if cond then Lwt.return () else f () -let of_sockaddr = function - | Unix.ADDR_UNIX _ -> None - | Unix.ADDR_INET (addr, port) -> - match Ipaddr_unix.of_inet_addr addr with - | V4 addr -> Some (Ipaddr.v6_of_v4 addr, port) - | V6 addr -> Some (addr, port) -let getaddrinfo ~passive ~node ~service = - let open Lwt_unix in - getaddrinfo node service - ( AI_SOCKTYPE SOCK_STREAM :: - (if passive then [AI_PASSIVE] else []) ) >>= fun addr -> - let points = - TzList.filter_map - (fun { ai_addr ; _ } -> of_sockaddr ai_addr) - addr in - Lwt.return points diff --git a/src/lib_stdlib_lwt/lwt_utils.mli b/src/lib_stdlib_lwt/lwt_utils.mli index 87f2f9bc5..7d0331b49 100644 --- a/src/lib_stdlib_lwt/lwt_utils.mli +++ b/src/lib_stdlib_lwt/lwt_utils.mli @@ -20,39 +20,6 @@ val worker: val trigger: unit -> (unit -> unit) * (unit -> unit Lwt.t) val sort: ('a -> 'a -> int Lwt.t) -> 'a list -> 'a list Lwt.t -val read_bytes: - ?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t - -val read_mbytes: - ?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t - -val write_bytes: - ?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t -val write_mbytes: - ?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t - -val remove_dir: string -> unit Lwt.t -val create_dir: ?perm:int -> string -> unit Lwt.t -val create_file: ?perm:int -> string -> string -> unit Lwt.t - -val safe_close: Lwt_unix.file_descr -> unit Lwt.t - -open Error_monad - -type error += Canceled -val protect : - ?on_error:(error list -> 'a tzresult Lwt.t) -> - ?canceler:Lwt_canceler.t -> - (unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t - -type error += Timeout -val with_timeout: - ?canceler:Lwt_canceler.t -> - float -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t val unless: bool -> (unit -> unit Lwt.t) -> unit Lwt.t -val getaddrinfo: - passive:bool -> - node:string -> service:string -> - (Ipaddr.V6.t * int) list Lwt.t diff --git a/src/lib_stdlib_lwt/lwt_utils_unix.ml b/src/lib_stdlib_lwt/lwt_utils_unix.ml new file mode 100644 index 000000000..4cb7fc468 --- /dev/null +++ b/src/lib_stdlib_lwt/lwt_utils_unix.ml @@ -0,0 +1,168 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Lwt.Infix + +let read_bytes ?(pos = 0) ?len fd buf = + let len = match len with None -> Bytes.length buf - pos | Some l -> l in + let rec inner pos len = + if len = 0 then + Lwt.return_unit + else + Lwt_unix.read fd buf pos len >>= function + | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) + | nb_read -> inner (pos + nb_read) (len - nb_read) + in + inner pos len + +let read_mbytes ?(pos=0) ?len fd buf = + let len = match len with None -> MBytes.length buf - pos | Some l -> l in + let rec inner pos len = + if len = 0 then + Lwt.return_unit + else + Lwt_bytes.read fd buf pos len >>= function + | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) + | nb_read -> inner (pos + nb_read) (len - nb_read) + in + inner pos len + +let write_mbytes ?(pos=0) ?len descr buf = + let len = match len with None -> MBytes.length buf - pos | Some l -> l in + let rec inner pos len = + if len = 0 then + Lwt.return_unit + else + Lwt_bytes.write descr buf pos len >>= function + | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) + | nb_written -> inner (pos + nb_written) (len - nb_written) in + inner pos len + +let write_bytes ?(pos=0) ?len descr buf = + let len = match len with None -> Bytes.length buf - pos | Some l -> l in + let rec inner pos len = + if len = 0 then + Lwt.return_unit + else + Lwt_unix.write descr buf pos len >>= function + | 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *) + | nb_written -> inner (pos + nb_written) (len - nb_written) in + inner pos len + +let (>>=) = Lwt.bind + +let remove_dir dir = + let rec remove dir = + let files = Lwt_unix.files_of_directory dir in + Lwt_stream.iter_s + (fun file -> + if file = "." || file = ".." then + Lwt.return () + else begin + let file = Filename.concat dir file in + if Sys.is_directory file + then remove file + else Lwt_unix.unlink file + end) + files >>= fun () -> + Lwt_unix.rmdir dir in + if Sys.file_exists dir && Sys.is_directory dir then + remove dir + else + Lwt.return () + +let rec create_dir ?(perm = 0o755) dir = + Lwt_unix.file_exists dir >>= function + | false -> + create_dir (Filename.dirname dir) >>= fun () -> + Lwt_unix.mkdir dir perm + | true -> + Lwt_unix.stat dir >>= function + | { st_kind = S_DIR ; _ } -> Lwt.return_unit + | _ -> Pervasives.failwith "Not a directory" + +let create_file ?(perm = 0o644) name content = + Lwt_unix.openfile name Unix.([O_TRUNC; O_CREAT; O_WRONLY]) perm >>= fun fd -> + Lwt_unix.write_string fd content 0 (String.length content) >>= fun _ -> + Lwt_unix.close fd + +let safe_close fd = + Lwt.catch + (fun () -> Lwt_unix.close fd) + (fun _ -> Lwt.return_unit) + + + +let of_sockaddr = function + | Unix.ADDR_UNIX _ -> None + | Unix.ADDR_INET (addr, port) -> + match Ipaddr_unix.of_inet_addr addr with + | V4 addr -> Some (Ipaddr.v6_of_v4 addr, port) + | V6 addr -> Some (addr, port) + +let getaddrinfo ~passive ~node ~service = + let open Lwt_unix in + getaddrinfo node service + ( AI_SOCKTYPE SOCK_STREAM :: + (if passive then [AI_PASSIVE] else []) ) >>= fun addr -> + let points = + TzList.filter_map + (fun { ai_addr ; _ } -> of_sockaddr ai_addr) + addr in + Lwt.return points + +open Error_monad + +type error += Canceled + +let protect ?on_error ?canceler t = + let cancelation = + match canceler with + | None -> Lwt_utils.never_ending + | Some canceler -> + (Lwt_canceler.cancelation canceler >>= fun () -> + fail Canceled ) in + let res = + Lwt.pick [ cancelation ; + Lwt.catch t (fun exn -> fail (Exn exn)) ] in + res >>= function + | Ok _ -> res + | Error err -> + let canceled = + Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in + let err = if canceled then [Canceled] else err in + match on_error with + | None -> Lwt.return (Error err) + | Some on_error -> + Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn)) + +type error += Timeout + +let () = + register_error_kind + `Temporary + ~id:"utils.Timeout" + ~title:"Timeout" + ~description:"Timeout" + Data_encoding.unit + (function Timeout -> Some () | _ -> None) + (fun () -> Timeout) + +let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f = + let timeout = Lwt_unix.sleep timeout in + let target = f canceler in + Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () -> + Lwt_unix.yield () >>= fun () -> + if Lwt.state target <> Lwt.Sleep then begin + Lwt.cancel timeout ; + target + end else begin + Lwt_canceler.cancel canceler >>= fun () -> + fail Timeout + end diff --git a/src/lib_stdlib_lwt/lwt_utils_unix.mli b/src/lib_stdlib_lwt/lwt_utils_unix.mli new file mode 100644 index 000000000..7fc566739 --- /dev/null +++ b/src/lib_stdlib_lwt/lwt_utils_unix.mli @@ -0,0 +1,45 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +val read_bytes: + ?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t + +val read_mbytes: + ?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t + +val write_bytes: + ?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t +val write_mbytes: + ?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t + +val remove_dir: string -> unit Lwt.t +val create_dir: ?perm:int -> string -> unit Lwt.t +val create_file: ?perm:int -> string -> string -> unit Lwt.t + +val safe_close: Lwt_unix.file_descr -> unit Lwt.t + +val getaddrinfo: + passive:bool -> + node:string -> service:string -> + (Ipaddr.V6.t * int) list Lwt.t + +open Error_monad + +type error += Canceled + +val protect : + ?on_error:(error list -> 'a tzresult Lwt.t) -> + ?canceler:Lwt_canceler.t -> + (unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t + +type error += Timeout +val with_timeout: + ?canceler:Lwt_canceler.t -> + float -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t +