Client refactor: move part of Lwt_utils
in Lwt_utils_unix
This commit is contained in:
parent
f22b3576d2
commit
f61eed1a67
@ -537,7 +537,7 @@ let resolve_addr ?default_port ?(passive = false) peer =
|
|||||||
invalid_arg ""
|
invalid_arg ""
|
||||||
| "", Some default_port -> string_of_int default_port
|
| "", Some default_port -> string_of_int default_port
|
||||||
| port, _ -> port in
|
| port, _ -> port in
|
||||||
Lwt_utils.getaddrinfo ~passive ~node ~service
|
Lwt_utils_unix.getaddrinfo ~passive ~node ~service
|
||||||
|
|
||||||
let resolve_addrs ?default_port ?passive peers =
|
let resolve_addrs ?default_port ?passive peers =
|
||||||
Lwt_list.fold_left_s begin fun a peer ->
|
Lwt_list.fold_left_s begin fun a peer ->
|
||||||
|
@ -93,7 +93,7 @@ let init_logger ?verbosity (log_config : Node_config_file.log) =
|
|||||||
let init_node ?sandbox (config : Node_config_file.t) =
|
let init_node ?sandbox (config : Node_config_file.t) =
|
||||||
let patch_context json ctxt =
|
let patch_context json ctxt =
|
||||||
let module Proto = (val Registred_protocol.get_exn genesis.protocol) in
|
let module Proto = (val Registred_protocol.get_exn genesis.protocol) in
|
||||||
Lwt_utils.protect begin fun () ->
|
Lwt_utils_unix.protect begin fun () ->
|
||||||
Proto.configure_sandbox ctxt json
|
Proto.configure_sandbox ctxt json
|
||||||
end >|= function
|
end >|= function
|
||||||
| Error err ->
|
| Error err ->
|
||||||
|
@ -168,18 +168,18 @@ let read_dir dir =
|
|||||||
open Lwt.Infix
|
open Lwt.Infix
|
||||||
|
|
||||||
let create_files dir units =
|
let create_files dir units =
|
||||||
Lwt_utils.remove_dir dir >>= fun () ->
|
Lwt_utils_unix.remove_dir dir >>= fun () ->
|
||||||
Lwt_utils.create_dir dir >>= fun () ->
|
Lwt_utils_unix.create_dir dir >>= fun () ->
|
||||||
Lwt_list.map_s
|
Lwt_list.map_s
|
||||||
(fun { name ; interface ; implementation } ->
|
(fun { name ; interface ; implementation } ->
|
||||||
let name = String.lowercase_ascii name in
|
let name = String.lowercase_ascii name in
|
||||||
let ml = dir // (name ^ ".ml") in
|
let ml = dir // (name ^ ".ml") in
|
||||||
let mli = dir // (name ^ ".mli") in
|
let mli = dir // (name ^ ".mli") in
|
||||||
Lwt_utils.create_file ml implementation >>= fun () ->
|
Lwt_utils_unix.create_file ml implementation >>= fun () ->
|
||||||
match interface with
|
match interface with
|
||||||
| None -> Lwt.return [ml]
|
| None -> Lwt.return [ml]
|
||||||
| Some content ->
|
| Some content ->
|
||||||
Lwt_utils.create_file mli content >>= fun () ->
|
Lwt_utils_unix.create_file mli content >>= fun () ->
|
||||||
Lwt.return [ mli ; ml ])
|
Lwt.return [ mli ; ml ])
|
||||||
units >>= fun files ->
|
units >>= fun files ->
|
||||||
let files = List.concat files in
|
let files = List.concat files in
|
||||||
|
@ -91,7 +91,7 @@ class file_wallet dir : wallet = object (self)
|
|||||||
fun alias_name list encoding ->
|
fun alias_name list encoding ->
|
||||||
Lwt.catch
|
Lwt.catch
|
||||||
(fun () ->
|
(fun () ->
|
||||||
Lwt_utils.create_dir dir >>= fun () ->
|
Lwt_utils_unix.create_dir dir >>= fun () ->
|
||||||
let filename = self#filename alias_name in
|
let filename = self#filename alias_name in
|
||||||
let json = Data_encoding.Json.construct encoding list in
|
let json = Data_encoding.Json.construct encoding list in
|
||||||
Data_encoding_ezjsonm.write_file filename json)
|
Data_encoding_ezjsonm.write_file filename json)
|
||||||
@ -127,7 +127,7 @@ let default_log ~base_dir channel msg =
|
|||||||
Lwt.return ()
|
Lwt.return ()
|
||||||
| log ->
|
| log ->
|
||||||
let (//) = Filename.concat in
|
let (//) = Filename.concat in
|
||||||
Lwt_utils.create_dir (base_dir // "logs" // log) >>= fun () ->
|
Lwt_utils_unix.create_dir (base_dir // "logs" // log) >>= fun () ->
|
||||||
Lwt_io.with_file
|
Lwt_io.with_file
|
||||||
~flags: Unix.[ O_APPEND ; O_CREAT ; O_WRONLY ]
|
~flags: Unix.[ O_APPEND ; O_CREAT ; O_WRONLY ]
|
||||||
~mode: Lwt_io.Output
|
~mode: Lwt_io.Output
|
||||||
|
@ -120,7 +120,7 @@ module Scheduler(IO : IO) = struct
|
|||||||
false, (Queue.pop st.readys_low)
|
false, (Queue.pop st.readys_low)
|
||||||
in
|
in
|
||||||
match msg with
|
match msg with
|
||||||
| Error [Lwt_utils.Canceled] ->
|
| Error [ Lwt_utils_unix.Canceled ] ->
|
||||||
worker_loop st
|
worker_loop st
|
||||||
| Error ([Connection_closed |
|
| Error ([Connection_closed |
|
||||||
Exn ( Lwt_pipe.Closed |
|
Exn ( Lwt_pipe.Closed |
|
||||||
@ -140,7 +140,7 @@ module Scheduler(IO : IO) = struct
|
|||||||
conn.current_push <- begin
|
conn.current_push <- begin
|
||||||
IO.push conn.out_param msg >>= function
|
IO.push conn.out_param msg >>= function
|
||||||
| Ok ()
|
| Ok ()
|
||||||
| Error [Lwt_utils.Canceled] ->
|
| Error [ Lwt_utils_unix.Canceled ] ->
|
||||||
return ()
|
return ()
|
||||||
| Error ([Connection_closed |
|
| Error ([Connection_closed |
|
||||||
Exn (Unix.Unix_error (EBADF, _, _) |
|
Exn (Unix.Unix_error (EBADF, _, _) |
|
||||||
@ -264,7 +264,7 @@ module WriteScheduler = Scheduler(struct
|
|||||||
let push fd buf =
|
let push fd buf =
|
||||||
Lwt.catch
|
Lwt.catch
|
||||||
(fun () ->
|
(fun () ->
|
||||||
Lwt_utils.write_mbytes fd buf >>= return)
|
Lwt_utils_unix.write_mbytes fd buf >>= return)
|
||||||
(function
|
(function
|
||||||
| Unix.Unix_error(Unix.ECONNRESET, _, _)
|
| Unix.Unix_error(Unix.ECONNRESET, _, _)
|
||||||
| Unix.Unix_error(Unix.EPIPE, _, _)
|
| Unix.Unix_error(Unix.EPIPE, _, _)
|
||||||
@ -357,7 +357,7 @@ let register =
|
|||||||
let cpt = ref 0 in
|
let cpt = ref 0 in
|
||||||
fun st conn ->
|
fun st conn ->
|
||||||
if st.closed then begin
|
if st.closed then begin
|
||||||
Lwt.async (fun () -> Lwt_utils.safe_close conn) ;
|
Lwt.async (fun () -> Lwt_utils_unix.safe_close conn) ;
|
||||||
raise Closed
|
raise Closed
|
||||||
end else begin
|
end else begin
|
||||||
let id = incr cpt; !cpt in
|
let id = incr cpt; !cpt in
|
||||||
@ -380,7 +380,7 @@ let register =
|
|||||||
Moving_average.destroy write_conn.counter ;
|
Moving_average.destroy write_conn.counter ;
|
||||||
Lwt_pipe.close write_queue ;
|
Lwt_pipe.close write_queue ;
|
||||||
Lwt_pipe.close read_queue ;
|
Lwt_pipe.close read_queue ;
|
||||||
Lwt_utils.safe_close conn
|
Lwt_utils_unix.safe_close conn
|
||||||
end ;
|
end ;
|
||||||
let conn = {
|
let conn = {
|
||||||
sched = st ; id ; conn ; canceler ;
|
sched = st ; id ; conn ; canceler ;
|
||||||
@ -481,7 +481,7 @@ let close ?timeout conn =
|
|||||||
| None ->
|
| None ->
|
||||||
return (Lwt_canceler.cancelation conn.canceler)
|
return (Lwt_canceler.cancelation conn.canceler)
|
||||||
| Some timeout ->
|
| Some timeout ->
|
||||||
Lwt_utils.with_timeout
|
Lwt_utils_unix.with_timeout
|
||||||
~canceler:conn.canceler timeout begin fun canceler ->
|
~canceler:conn.canceler timeout begin fun canceler ->
|
||||||
return (Lwt_canceler.cancelation canceler)
|
return (Lwt_canceler.cancelation canceler)
|
||||||
end
|
end
|
||||||
|
@ -119,7 +119,7 @@ and too_few_connections st n_connected =
|
|||||||
discover the local network and then wait *)
|
discover the local network and then wait *)
|
||||||
Option.iter ~f:P2p_discovery.restart st.disco ;
|
Option.iter ~f:P2p_discovery.restart st.disco ;
|
||||||
P2p_pool.broadcast_bootstrap_msg pool ;
|
P2p_pool.broadcast_bootstrap_msg pool ;
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Lwt.pick [
|
Lwt.pick [
|
||||||
P2p_pool.Pool_event.wait_new_peer pool ;
|
P2p_pool.Pool_event.wait_new_peer pool ;
|
||||||
Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
|
Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
|
||||||
@ -146,7 +146,7 @@ and too_many_connections st n_connected =
|
|||||||
let rec worker_loop st =
|
let rec worker_loop st =
|
||||||
let Pool pool = st.pool in
|
let Pool pool = st.pool in
|
||||||
begin
|
begin
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Lwt.pick [
|
Lwt.pick [
|
||||||
Lwt_unix.sleep 120. ; (* every two minutes *)
|
Lwt_unix.sleep 120. ; (* every two minutes *)
|
||||||
Lwt_condition.wait st.please_maintain ; (* when asked *)
|
Lwt_condition.wait st.please_maintain ; (* when asked *)
|
||||||
@ -165,7 +165,7 @@ let rec worker_loop st =
|
|||||||
end
|
end
|
||||||
end >>= function
|
end >>= function
|
||||||
| Ok () -> worker_loop st
|
| Ok () -> worker_loop st
|
||||||
| Error [Lwt_utils.Canceled] -> Lwt.return_unit
|
| Error [ Lwt_utils_unix.Canceled ] -> Lwt.return_unit
|
||||||
| Error _ -> Lwt.return_unit
|
| Error _ -> Lwt.return_unit
|
||||||
|
|
||||||
let run ~connection_timeout bounds pool disco =
|
let run ~connection_timeout bounds pool disco =
|
||||||
|
@ -87,7 +87,7 @@ module Answerer = struct
|
|||||||
|
|
||||||
let rec worker_loop st =
|
let rec worker_loop st =
|
||||||
Lwt_unix.yield () >>= fun () ->
|
Lwt_unix.yield () >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
P2p_socket.read st.conn
|
P2p_socket.read st.conn
|
||||||
end >>= function
|
end >>= function
|
||||||
| Ok (_, Bootstrap) -> begin
|
| Ok (_, Bootstrap) -> begin
|
||||||
@ -122,7 +122,7 @@ module Answerer = struct
|
|||||||
(* TODO: Penalize peer... *)
|
(* TODO: Penalize peer... *)
|
||||||
Lwt_canceler.cancel st.canceler >>= fun () ->
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error [Lwt_utils.Canceled] ->
|
| Error [Lwt_utils_unix.Canceled] ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error err ->
|
| Error err ->
|
||||||
lwt_log_error "@[Answerer unexpected error:@ %a@]"
|
lwt_log_error "@[Answerer unexpected error:@ %a@]"
|
||||||
@ -568,7 +568,7 @@ let rec connect ~timeout pool point =
|
|||||||
(active_connections pool <= pool.config.max_connections)
|
(active_connections pool <= pool.config.max_connections)
|
||||||
Too_many_connections >>=? fun () ->
|
Too_many_connections >>=? fun () ->
|
||||||
let canceler = Lwt_canceler.create () in
|
let canceler = Lwt_canceler.create () in
|
||||||
Lwt_utils.with_timeout ~canceler timeout begin fun canceler ->
|
Lwt_utils_unix.with_timeout ~canceler timeout begin fun canceler ->
|
||||||
let point_info =
|
let point_info =
|
||||||
register_point pool pool.config.identity.peer_id point in
|
register_point pool pool.config.identity.peer_id point in
|
||||||
let addr, port as point = P2p_point_state.Info.point point_info in
|
let addr, port as point = P2p_point_state.Info.point point_info in
|
||||||
@ -581,14 +581,14 @@ let rec connect ~timeout pool point =
|
|||||||
let uaddr =
|
let uaddr =
|
||||||
Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
|
Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
|
||||||
lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () ->
|
lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler begin fun () ->
|
||||||
log pool (Outgoing_connection point) ;
|
log pool (Outgoing_connection point) ;
|
||||||
Lwt_unix.connect fd uaddr >>= fun () ->
|
Lwt_unix.connect fd uaddr >>= fun () ->
|
||||||
return ()
|
return ()
|
||||||
end ~on_error: begin fun err ->
|
end ~on_error: begin fun err ->
|
||||||
lwt_debug "connect: %a -> disconnect" P2p_point.Id.pp point >>= fun () ->
|
lwt_debug "connect: %a -> disconnect" P2p_point.Id.pp point >>= fun () ->
|
||||||
P2p_point_state.set_disconnected point_info ;
|
P2p_point_state.set_disconnected point_info ;
|
||||||
Lwt_utils.safe_close fd >>= fun () ->
|
Lwt_utils_unix.safe_close fd >>= fun () ->
|
||||||
match err with
|
match err with
|
||||||
| [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] ->
|
| [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] ->
|
||||||
fail Connection_refused
|
fail Connection_refused
|
||||||
@ -603,7 +603,7 @@ and authenticate pool ?point_info canceler fd point =
|
|||||||
lwt_debug "authenticate: %a%s"
|
lwt_debug "authenticate: %a%s"
|
||||||
P2p_point.Id.pp point
|
P2p_point.Id.pp point
|
||||||
(if incoming then " incoming" else "") >>= fun () ->
|
(if incoming then " incoming" else "") >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler begin fun () ->
|
||||||
P2p_socket.authenticate
|
P2p_socket.authenticate
|
||||||
~proof_of_work_target:pool.config.proof_of_work_target
|
~proof_of_work_target:pool.config.proof_of_work_target
|
||||||
~incoming (P2p_io_scheduler.register pool.io_sched fd) point
|
~incoming (P2p_io_scheduler.register pool.io_sched fd) point
|
||||||
@ -612,7 +612,7 @@ and authenticate pool ?point_info canceler fd point =
|
|||||||
end ~on_error: begin fun err ->
|
end ~on_error: begin fun err ->
|
||||||
(* TODO do something when the error is Not_enough_proof_of_work ?? *)
|
(* TODO do something when the error is Not_enough_proof_of_work ?? *)
|
||||||
begin match err with
|
begin match err with
|
||||||
| [ Lwt_utils.Canceled ] ->
|
| [ Lwt_utils_unix.Canceled ] ->
|
||||||
(* Currently only on time out *)
|
(* Currently only on time out *)
|
||||||
lwt_debug "authenticate: %a%s -> canceled"
|
lwt_debug "authenticate: %a%s -> canceled"
|
||||||
P2p_point.Id.pp point
|
P2p_point.Id.pp point
|
||||||
@ -682,7 +682,7 @@ and authenticate pool ?point_info canceler fd point =
|
|||||||
lwt_debug "authenticate: %a -> accept %a"
|
lwt_debug "authenticate: %a -> accept %a"
|
||||||
P2p_point.Id.pp point
|
P2p_point.Id.pp point
|
||||||
P2p_connection.Info.pp info >>= fun () ->
|
P2p_connection.Info.pp info >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler begin fun () ->
|
||||||
P2p_socket.accept
|
P2p_socket.accept
|
||||||
?incoming_message_queue_size:pool.config.incoming_message_queue_size
|
?incoming_message_queue_size:pool.config.incoming_message_queue_size
|
||||||
?outgoing_message_queue_size:pool.config.outgoing_message_queue_size
|
?outgoing_message_queue_size:pool.config.outgoing_message_queue_size
|
||||||
@ -881,7 +881,7 @@ and swap pool conn current_peer_id new_point =
|
|||||||
pool.latest_accepted_swap <- pool.latest_succesfull_swap ;
|
pool.latest_accepted_swap <- pool.latest_succesfull_swap ;
|
||||||
log pool (Swap_failure { source = source_peer_id }) ;
|
log pool (Swap_failure { source = source_peer_id }) ;
|
||||||
match err with
|
match err with
|
||||||
| [ Lwt_utils.Timeout ] ->
|
| [ Lwt_utils_unix.Timeout ] ->
|
||||||
lwt_debug "Swap to %a was interupted: %a"
|
lwt_debug "Swap to %a was interupted: %a"
|
||||||
P2p_point.Id.pp new_point pp_print_error err
|
P2p_point.Id.pp new_point pp_print_error err
|
||||||
| _ ->
|
| _ ->
|
||||||
@ -893,12 +893,12 @@ let accept pool fd point =
|
|||||||
log pool (Incoming_connection point) ;
|
log pool (Incoming_connection point) ;
|
||||||
if pool.config.max_incoming_connections <= P2p_point.Table.length pool.incoming
|
if pool.config.max_incoming_connections <= P2p_point.Table.length pool.incoming
|
||||||
|| pool.config.max_connections <= active_connections pool then
|
|| pool.config.max_connections <= active_connections pool then
|
||||||
Lwt.async (fun () -> Lwt_utils.safe_close fd)
|
Lwt.async (fun () -> Lwt_utils_unix.safe_close fd)
|
||||||
else
|
else
|
||||||
let canceler = Lwt_canceler.create () in
|
let canceler = Lwt_canceler.create () in
|
||||||
P2p_point.Table.add pool.incoming point canceler ;
|
P2p_point.Table.add pool.incoming point canceler ;
|
||||||
Lwt.async begin fun () ->
|
Lwt.async begin fun () ->
|
||||||
Lwt_utils.with_timeout
|
Lwt_utils_unix.with_timeout
|
||||||
~canceler pool.config.authentification_timeout
|
~canceler pool.config.authentification_timeout
|
||||||
(fun canceler -> authenticate pool canceler fd point)
|
(fun canceler -> authenticate pool canceler fd point)
|
||||||
end
|
end
|
||||||
|
@ -249,7 +249,7 @@ module Reader = struct
|
|||||||
lwt_debug "[read_message] incremental decoding error" >>= fun () ->
|
lwt_debug "[read_message] incremental decoding error" >>= fun () ->
|
||||||
return None
|
return None
|
||||||
| Await decode_next_buf ->
|
| Await decode_next_buf ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data
|
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data
|
||||||
end >>=? fun buf ->
|
end >>=? fun buf ->
|
||||||
lwt_debug
|
lwt_debug
|
||||||
@ -265,12 +265,12 @@ module Reader = struct
|
|||||||
read_message st init_mbytes >>=? fun msg ->
|
read_message st init_mbytes >>=? fun msg ->
|
||||||
match msg with
|
match msg with
|
||||||
| None ->
|
| None ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
|
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
|
||||||
return None
|
return None
|
||||||
end
|
end
|
||||||
| Some (msg, size, rem_mbytes) ->
|
| Some (msg, size, rem_mbytes) ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
||||||
return (Some rem_mbytes)
|
return (Some rem_mbytes)
|
||||||
end
|
end
|
||||||
@ -280,7 +280,7 @@ module Reader = struct
|
|||||||
| Ok None ->
|
| Ok None ->
|
||||||
Lwt_canceler.cancel st.canceler >>= fun () ->
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
lwt_debug "connection closed to %a"
|
lwt_debug "connection closed to %a"
|
||||||
P2p_connection.Info.pp st.conn.info >>= fun () ->
|
P2p_connection.Info.pp st.conn.info >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
@ -331,7 +331,7 @@ module Writer = struct
|
|||||||
let rec loop = function
|
let rec loop = function
|
||||||
| [] -> return ()
|
| [] -> return ()
|
||||||
| buf :: l ->
|
| buf :: l ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
|
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
|
||||||
end >>=? fun () ->
|
end >>=? fun () ->
|
||||||
lwt_debug "writing %d bytes to %a"
|
lwt_debug "writing %d bytes to %a"
|
||||||
@ -345,10 +345,10 @@ module Writer = struct
|
|||||||
|
|
||||||
let rec worker_loop st =
|
let rec worker_loop st =
|
||||||
Lwt_unix.yield () >>= fun () ->
|
Lwt_unix.yield () >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Lwt_pipe.pop st.messages >>= return
|
Lwt_pipe.pop st.messages >>= return
|
||||||
end >>= function
|
end >>= function
|
||||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
lwt_debug "connection closed to %a"
|
lwt_debug "connection closed to %a"
|
||||||
P2p_connection.Info.pp st.conn.info >>= fun () ->
|
P2p_connection.Info.pp st.conn.info >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
@ -370,7 +370,7 @@ module Writer = struct
|
|||||||
Lwt.wakeup_later u
|
Lwt.wakeup_later u
|
||||||
(Error [P2p_io_scheduler.Connection_closed])) ;
|
(Error [P2p_io_scheduler.Connection_closed])) ;
|
||||||
match err with
|
match err with
|
||||||
| [ Lwt_utils.Canceled | Exn Lwt_pipe.Closed ] ->
|
| [ Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed ] ->
|
||||||
lwt_debug "connection closed to %a"
|
lwt_debug "connection closed to %a"
|
||||||
P2p_connection.Info.pp st.conn.info >>= fun () ->
|
P2p_connection.Info.pp st.conn.info >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
@ -453,7 +453,7 @@ let info { conn } = conn.info
|
|||||||
let accept
|
let accept
|
||||||
?incoming_message_queue_size ?outgoing_message_queue_size
|
?incoming_message_queue_size ?outgoing_message_queue_size
|
||||||
?binary_chunks_size (fd, info, cryptobox_data) encoding =
|
?binary_chunks_size (fd, info, cryptobox_data) encoding =
|
||||||
Lwt_utils.protect begin fun () ->
|
Lwt_utils_unix.protect begin fun () ->
|
||||||
Ack.write fd cryptobox_data Ack >>=? fun () ->
|
Ack.write fd cryptobox_data Ack >>=? fun () ->
|
||||||
Ack.read fd cryptobox_data
|
Ack.read fd cryptobox_data
|
||||||
end ~on_error:begin fun err ->
|
end ~on_error:begin fun err ->
|
||||||
|
@ -21,7 +21,7 @@ type t = {
|
|||||||
let rec worker_loop st =
|
let rec worker_loop st =
|
||||||
let Pool pool = st.pool in
|
let Pool pool = st.pool in
|
||||||
Lwt_unix.yield () >>= fun () ->
|
Lwt_unix.yield () >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () ->
|
||||||
Lwt_unix.accept st.socket >>= return
|
Lwt_unix.accept st.socket >>= return
|
||||||
end >>= function
|
end >>= function
|
||||||
| Ok (fd, addr) ->
|
| Ok (fd, addr) ->
|
||||||
@ -32,7 +32,7 @@ let rec worker_loop st =
|
|||||||
(Ipaddr_unix.V6.of_inet_addr_exn addr, port) in
|
(Ipaddr_unix.V6.of_inet_addr_exn addr, port) in
|
||||||
P2p_pool.accept pool fd point ;
|
P2p_pool.accept pool fd point ;
|
||||||
worker_loop st
|
worker_loop st
|
||||||
| Error [Lwt_utils.Canceled] ->
|
| Error [Lwt_utils_unix.Canceled] ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error err ->
|
| Error err ->
|
||||||
lwt_log_error "@[<v 2>Unexpected error in the Welcome worker@ %a@]"
|
lwt_log_error "@[<v 2>Unexpected error in the Welcome worker@ %a@]"
|
||||||
@ -53,7 +53,7 @@ let run ~backlog pool ?addr port =
|
|||||||
~backlog ?addr port >>= fun socket ->
|
~backlog ?addr port >>= fun socket ->
|
||||||
let canceler = Lwt_canceler.create () in
|
let canceler = Lwt_canceler.create () in
|
||||||
Lwt_canceler.on_cancel canceler begin fun () ->
|
Lwt_canceler.on_cancel canceler begin fun () ->
|
||||||
Lwt_utils.safe_close socket
|
Lwt_utils_unix.safe_close socket
|
||||||
end ;
|
end ;
|
||||||
let st = {
|
let st = {
|
||||||
socket ; canceler ; pool = Pool pool ;
|
socket ; canceler ; pool = Pool pool ;
|
||||||
|
@ -151,7 +151,7 @@ let run
|
|||||||
let client n =
|
let client n =
|
||||||
let prefix = Printf.sprintf "client(%d): " n in
|
let prefix = Printf.sprintf "client(%d): " n in
|
||||||
Process.detach ~prefix begin fun _ ->
|
Process.detach ~prefix begin fun _ ->
|
||||||
Lwt_utils.safe_close main_socket >>= fun () ->
|
Lwt_utils_unix.safe_close main_socket >>= fun () ->
|
||||||
client ?max_upload_speed ?write_queue_size addr port time n
|
client ?max_upload_speed ?write_queue_size addr port time n
|
||||||
end in
|
end in
|
||||||
Lwt_list.map_p client (1 -- n) >>= fun client_nodes ->
|
Lwt_list.map_p client (1 -- n) >>= fun client_nodes ->
|
||||||
|
@ -122,8 +122,8 @@ module Simple = struct
|
|||||||
| Error ([ P2p_pool.Connection_refused
|
| Error ([ P2p_pool.Connection_refused
|
||||||
| P2p_pool.Pending_connection
|
| P2p_pool.Pending_connection
|
||||||
| P2p_socket.Rejected
|
| P2p_socket.Rejected
|
||||||
| Lwt_utils.Canceled
|
| Lwt_utils_unix.Canceled
|
||||||
| Lwt_utils.Timeout
|
| Lwt_utils_unix.Timeout
|
||||||
| P2p_pool.Rejected _ as err ]) ->
|
| P2p_pool.Rejected _ as err ]) ->
|
||||||
lwt_log_info "Connection to %a failed (%a)"
|
lwt_log_info "Connection to %a failed (%a)"
|
||||||
P2p_point.Id.pp point
|
P2p_point.Id.pp point
|
||||||
@ -134,9 +134,9 @@ module Simple = struct
|
|||||||
Format.fprintf ppf "pending connection"
|
Format.fprintf ppf "pending connection"
|
||||||
| P2p_socket.Rejected ->
|
| P2p_socket.Rejected ->
|
||||||
Format.fprintf ppf "rejected"
|
Format.fprintf ppf "rejected"
|
||||||
| Lwt_utils.Canceled ->
|
| Lwt_utils_unix.Canceled ->
|
||||||
Format.fprintf ppf "canceled"
|
Format.fprintf ppf "canceled"
|
||||||
| Lwt_utils.Timeout ->
|
| Lwt_utils_unix.Timeout ->
|
||||||
Format.fprintf ppf "timeout"
|
Format.fprintf ppf "timeout"
|
||||||
| P2p_pool.Rejected peer ->
|
| P2p_pool.Rejected peer ->
|
||||||
Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer
|
Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer
|
||||||
|
@ -76,7 +76,7 @@ let run_nodes client server =
|
|||||||
return ()
|
return ()
|
||||||
end >>= fun server_node ->
|
end >>= fun server_node ->
|
||||||
Process.detach ~prefix:"client: " begin fun channel ->
|
Process.detach ~prefix:"client: " begin fun channel ->
|
||||||
Lwt_utils.safe_close main_socket >>= fun () ->
|
Lwt_utils_unix.safe_close main_socket >>= fun () ->
|
||||||
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
|
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
|
||||||
client channel sched default_addr port >>=? fun () ->
|
client channel sched default_addr port >>=? fun () ->
|
||||||
P2p_io_scheduler.shutdown sched >>= fun () ->
|
P2p_io_scheduler.shutdown sched >>= fun () ->
|
||||||
|
@ -174,12 +174,12 @@ let main { compile_ml ; pack_objects ; link_shared } =
|
|||||||
match !build_dir with
|
match !build_dir with
|
||||||
| None ->
|
| None ->
|
||||||
let dir = mktemp_dir () in
|
let dir = mktemp_dir () in
|
||||||
at_exit (fun () -> Lwt_main.run (Lwt_utils.remove_dir dir)) ;
|
at_exit (fun () -> Lwt_main.run (Lwt_utils_unix.remove_dir dir)) ;
|
||||||
dir
|
dir
|
||||||
| Some dir -> dir in
|
| Some dir -> dir in
|
||||||
Lwt_main.run (Lwt_utils.create_dir ~perm:0o755 build_dir) ;
|
|
||||||
Lwt_main.run (Lwt_utils.create_dir ~perm:0o755 (Filename.dirname output)) ;
|
|
||||||
let hash, protocol = Protocol.read_dir source_dir in
|
let hash, protocol = Protocol.read_dir source_dir in
|
||||||
|
Lwt_main.run (Lwt_utils_unix.create_dir ~perm:0o755 build_dir) ;
|
||||||
|
Lwt_main.run (Lwt_utils_unix.create_dir ~perm:0o755 (Filename.dirname output)) ;
|
||||||
(* Generate the 'functor' *)
|
(* Generate the 'functor' *)
|
||||||
let functor_file = build_dir // "functor.ml" in
|
let functor_file = build_dir // "functor.ml" in
|
||||||
let oc = open_out functor_file in
|
let oc = open_out functor_file in
|
||||||
|
@ -241,7 +241,7 @@ let on_request
|
|||||||
net_state header.shell.predecessor >>=? fun pred ->
|
net_state header.shell.predecessor >>=? fun pred ->
|
||||||
get_proto pred hash >>=? fun proto ->
|
get_proto pred hash >>=? fun proto ->
|
||||||
(* TODO also protect with [Worker.canceler w]. *)
|
(* TODO also protect with [Worker.canceler w]. *)
|
||||||
Lwt_utils.protect ?canceler begin fun () ->
|
Lwt_utils_unix.protect ?canceler begin fun () ->
|
||||||
apply_block
|
apply_block
|
||||||
(Distributed_db.net_state net_db)
|
(Distributed_db.net_state net_db)
|
||||||
pred proto hash header operations
|
pred proto hash header operations
|
||||||
@ -264,7 +264,7 @@ let on_request
|
|||||||
end
|
end
|
||||||
(* TODO catch other temporary error (e.g. system errors)
|
(* TODO catch other temporary error (e.g. system errors)
|
||||||
and do not 'commit' them on disk... *)
|
and do not 'commit' them on disk... *)
|
||||||
| Error [Lwt_utils.Canceled | Unavailable_protocol _] as err ->
|
| Error [Lwt_utils_unix.Canceled | Unavailable_protocol _] as err ->
|
||||||
return err
|
return err
|
||||||
| Error errors ->
|
| Error errors ->
|
||||||
Worker.protect w begin fun () ->
|
Worker.protect w begin fun () ->
|
||||||
|
@ -55,7 +55,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) =
|
|||||||
lwt_debug "fetching block header %a from peer %a."
|
lwt_debug "fetching block header %a from peer %a."
|
||||||
Block_hash.pp_short hash
|
Block_hash.pp_short hash
|
||||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Distributed_db.Block_header.fetch
|
Distributed_db.Block_header.fetch
|
||||||
~timeout:pipeline.block_header_timeout
|
~timeout:pipeline.block_header_timeout
|
||||||
pipeline.net_db ~peer:pipeline.peer_id
|
pipeline.net_db ~peer:pipeline.peer_id
|
||||||
@ -69,7 +69,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) =
|
|||||||
fetch_loop [] step.block step.step >>=? fun headers ->
|
fetch_loop [] step.block step.step >>=? fun headers ->
|
||||||
iter_s
|
iter_s
|
||||||
begin fun header ->
|
begin fun header ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Lwt_pipe.push pipeline.fetched_headers header >>= return
|
Lwt_pipe.push pipeline.fetched_headers header >>= return
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -87,7 +87,7 @@ let headers_fetch_worker_loop pipeline =
|
|||||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||||
Lwt_pipe.close pipeline.fetched_headers ;
|
Lwt_pipe.close pipeline.fetched_headers ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error [ Distributed_db.Block_header.Timeout bh ] ->
|
| Error [ Distributed_db.Block_header.Timeout bh ] ->
|
||||||
lwt_log_info "request for header %a from peer %a timed out."
|
lwt_log_info "request for header %a from peer %a timed out."
|
||||||
@ -105,7 +105,7 @@ let headers_fetch_worker_loop pipeline =
|
|||||||
let rec operations_fetch_worker_loop pipeline =
|
let rec operations_fetch_worker_loop pipeline =
|
||||||
begin
|
begin
|
||||||
Lwt_unix.yield () >>= fun () ->
|
Lwt_unix.yield () >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Lwt_pipe.pop pipeline.fetched_headers >>= return
|
Lwt_pipe.pop pipeline.fetched_headers >>= return
|
||||||
end >>=? fun (hash, header) ->
|
end >>=? fun (hash, header) ->
|
||||||
lwt_log_info "fetching operations of block %a from peer %a."
|
lwt_log_info "fetching operations of block %a from peer %a."
|
||||||
@ -113,7 +113,7 @@ let rec operations_fetch_worker_loop pipeline =
|
|||||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||||
map_p
|
map_p
|
||||||
(fun i ->
|
(fun i ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Distributed_db.Operations.fetch
|
Distributed_db.Operations.fetch
|
||||||
~timeout:pipeline.block_operations_timeout
|
~timeout:pipeline.block_operations_timeout
|
||||||
pipeline.net_db ~peer:pipeline.peer_id
|
pipeline.net_db ~peer:pipeline.peer_id
|
||||||
@ -123,14 +123,14 @@ let rec operations_fetch_worker_loop pipeline =
|
|||||||
lwt_log_info "fetched operations of block %a from peer %a."
|
lwt_log_info "fetched operations of block %a from peer %a."
|
||||||
Block_hash.pp_short hash
|
Block_hash.pp_short hash
|
||||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Lwt_pipe.push pipeline.fetched_blocks
|
Lwt_pipe.push pipeline.fetched_blocks
|
||||||
(hash, header, operations) >>= return
|
(hash, header, operations) >>= return
|
||||||
end
|
end
|
||||||
end >>= function
|
end >>= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
operations_fetch_worker_loop pipeline
|
operations_fetch_worker_loop pipeline
|
||||||
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
Lwt_pipe.close pipeline.fetched_blocks ;
|
Lwt_pipe.close pipeline.fetched_blocks ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error [ Distributed_db.Operations.Timeout (bh, n) ] ->
|
| Error [ Distributed_db.Operations.Timeout (bh, n) ] ->
|
||||||
@ -149,13 +149,13 @@ let rec operations_fetch_worker_loop pipeline =
|
|||||||
let rec validation_worker_loop pipeline =
|
let rec validation_worker_loop pipeline =
|
||||||
begin
|
begin
|
||||||
Lwt_unix.yield () >>= fun () ->
|
Lwt_unix.yield () >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Lwt_pipe.pop pipeline.fetched_blocks >>= return
|
Lwt_pipe.pop pipeline.fetched_blocks >>= return
|
||||||
end >>=? fun (hash, header, operations) ->
|
end >>=? fun (hash, header, operations) ->
|
||||||
lwt_log_info "requesting validation for block %a from peer %a."
|
lwt_log_info "requesting validation for block %a from peer %a."
|
||||||
Block_hash.pp_short hash
|
Block_hash.pp_short hash
|
||||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||||
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () ->
|
||||||
Block_validator.validate
|
Block_validator.validate
|
||||||
~canceler:pipeline.canceler
|
~canceler:pipeline.canceler
|
||||||
~notify_new_block:pipeline.notify_new_block
|
~notify_new_block:pipeline.notify_new_block
|
||||||
@ -168,7 +168,7 @@ let rec validation_worker_loop pipeline =
|
|||||||
return ()
|
return ()
|
||||||
end >>= function
|
end >>= function
|
||||||
| Ok () -> validation_worker_loop pipeline
|
| Ok () -> validation_worker_loop pipeline
|
||||||
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error ([ Block_validator_errors.Invalid_block _
|
| Error ([ Block_validator_errors.Invalid_block _
|
||||||
| Block_validator_errors.Unavailable_protocol _ ] as err ) ->
|
| Block_validator_errors.Unavailable_protocol _ ] as err ) ->
|
||||||
|
@ -629,7 +629,7 @@ module P2p_reader = struct
|
|||||||
end
|
end
|
||||||
|
|
||||||
let rec worker_loop global_db state =
|
let rec worker_loop global_db state =
|
||||||
Lwt_utils.protect ~canceler:state.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:state.canceler begin fun () ->
|
||||||
P2p.recv global_db.p2p state.conn
|
P2p.recv global_db.p2p state.conn
|
||||||
end >>= function
|
end >>= function
|
||||||
| Ok msg ->
|
| Ok msg ->
|
||||||
|
@ -78,7 +78,7 @@ let () =
|
|||||||
|
|
||||||
let rec worker_loop bv =
|
let rec worker_loop bv =
|
||||||
begin
|
begin
|
||||||
Lwt_utils.protect ~canceler:bv.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:bv.canceler begin fun () ->
|
||||||
Lwt_pipe.pop bv.messages >>= return
|
Lwt_pipe.pop bv.messages >>= return
|
||||||
end >>=? function Message (request, wakener) ->
|
end >>=? function Message (request, wakener) ->
|
||||||
match request with
|
match request with
|
||||||
@ -115,7 +115,7 @@ let rec worker_loop bv =
|
|||||||
end >>= function
|
end >>= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
worker_loop bv
|
worker_loop bv
|
||||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
lwt_log_notice "terminating" >>= fun () ->
|
lwt_log_notice "terminating" >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error err ->
|
| Error err ->
|
||||||
|
@ -202,7 +202,7 @@ module Make
|
|||||||
Lwt.ignore_result (Lwt_canceler.cancel w.canceler)
|
Lwt.ignore_result (Lwt_canceler.cancel w.canceler)
|
||||||
|
|
||||||
let protect { canceler } ?on_error f =
|
let protect { canceler } ?on_error f =
|
||||||
Lwt_utils.protect ?on_error ~canceler f
|
Lwt_utils_unix.protect ?on_error ~canceler f
|
||||||
|
|
||||||
let canceler { canceler } = canceler
|
let canceler { canceler } = canceler
|
||||||
|
|
||||||
@ -270,7 +270,7 @@ module Make
|
|||||||
Lwt.return_unit in
|
Lwt.return_unit in
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
begin
|
begin
|
||||||
Lwt_utils.protect ~canceler:w.canceler begin fun () ->
|
Lwt_utils_unix.protect ~canceler:w.canceler begin fun () ->
|
||||||
pop w
|
pop w
|
||||||
end >>=? function
|
end >>=? function
|
||||||
| None -> Handlers.on_no_request w
|
| None -> Handlers.on_no_request w
|
||||||
@ -301,7 +301,7 @@ module Make
|
|||||||
end >>= function
|
end >>= function
|
||||||
| Ok () ->
|
| Ok () ->
|
||||||
loop ()
|
loop ()
|
||||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] ->
|
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] ->
|
||||||
Logger.lwt_log_info
|
Logger.lwt_log_info
|
||||||
"[%a] worker terminated"
|
"[%a] worker terminated"
|
||||||
Name.pp w.name >>= fun () ->
|
Name.pp w.name >>= fun () ->
|
||||||
|
@ -34,7 +34,7 @@ let blocking_create
|
|||||||
create_inner Unix.F_LOCK ~close_on_exec ~unlink_on_exit fn in
|
create_inner Unix.F_LOCK ~close_on_exec ~unlink_on_exit fn in
|
||||||
match timeout with
|
match timeout with
|
||||||
| None -> create ()
|
| None -> create ()
|
||||||
| Some duration -> Lwt_utils.with_timeout duration (fun _ -> create ())
|
| Some duration -> Lwt_utils_unix.with_timeout duration (fun _ -> create ())
|
||||||
|
|
||||||
let is_locked fn =
|
let is_locked fn =
|
||||||
if not @@ Sys.file_exists fn then return false else
|
if not @@ Sys.file_exists fn then return false else
|
||||||
@ -50,7 +50,7 @@ let is_locked fn =
|
|||||||
|
|
||||||
let get_pid fn =
|
let get_pid fn =
|
||||||
let open Lwt_io in
|
let open Lwt_io in
|
||||||
Lwt_utils.protect begin fun () ->
|
Lwt_utils_unix.protect begin fun () ->
|
||||||
with_file ~mode:Input fn begin fun ic ->
|
with_file ~mode:Input fn begin fun ic ->
|
||||||
read ic >>= fun content ->
|
read ic >>= fun content ->
|
||||||
return (int_of_string content)
|
return (int_of_string content)
|
||||||
|
@ -161,161 +161,7 @@ let stable_sort cmp l =
|
|||||||
|
|
||||||
let sort = stable_sort
|
let sort = stable_sort
|
||||||
|
|
||||||
let read_bytes ?(pos = 0) ?len fd buf =
|
|
||||||
let len = match len with None -> Bytes.length buf - pos | Some l -> l in
|
|
||||||
let rec inner pos len =
|
|
||||||
if len = 0 then
|
|
||||||
Lwt.return_unit
|
|
||||||
else
|
|
||||||
Lwt_unix.read fd buf pos len >>= function
|
|
||||||
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
|
||||||
| nb_read -> inner (pos + nb_read) (len - nb_read)
|
|
||||||
in
|
|
||||||
inner pos len
|
|
||||||
|
|
||||||
let read_mbytes ?(pos=0) ?len fd buf =
|
|
||||||
let len = match len with None -> MBytes.length buf - pos | Some l -> l in
|
|
||||||
let rec inner pos len =
|
|
||||||
if len = 0 then
|
|
||||||
Lwt.return_unit
|
|
||||||
else
|
|
||||||
Lwt_bytes.read fd buf pos len >>= function
|
|
||||||
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
|
||||||
| nb_read -> inner (pos + nb_read) (len - nb_read)
|
|
||||||
in
|
|
||||||
inner pos len
|
|
||||||
|
|
||||||
let write_mbytes ?(pos=0) ?len descr buf =
|
|
||||||
let len = match len with None -> MBytes.length buf - pos | Some l -> l in
|
|
||||||
let rec inner pos len =
|
|
||||||
if len = 0 then
|
|
||||||
Lwt.return_unit
|
|
||||||
else
|
|
||||||
Lwt_bytes.write descr buf pos len >>= function
|
|
||||||
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
|
||||||
| nb_written -> inner (pos + nb_written) (len - nb_written) in
|
|
||||||
inner pos len
|
|
||||||
|
|
||||||
let write_bytes ?(pos=0) ?len descr buf =
|
|
||||||
let len = match len with None -> Bytes.length buf - pos | Some l -> l in
|
|
||||||
let rec inner pos len =
|
|
||||||
if len = 0 then
|
|
||||||
Lwt.return_unit
|
|
||||||
else
|
|
||||||
Lwt_unix.write descr buf pos len >>= function
|
|
||||||
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
|
||||||
| nb_written -> inner (pos + nb_written) (len - nb_written) in
|
|
||||||
inner pos len
|
|
||||||
|
|
||||||
let (>>=) = Lwt.bind
|
|
||||||
|
|
||||||
let remove_dir dir =
|
|
||||||
let rec remove dir =
|
|
||||||
let files = Lwt_unix.files_of_directory dir in
|
|
||||||
Lwt_stream.iter_s
|
|
||||||
(fun file ->
|
|
||||||
if file = "." || file = ".." then
|
|
||||||
Lwt.return ()
|
|
||||||
else begin
|
|
||||||
let file = Filename.concat dir file in
|
|
||||||
if Sys.is_directory file
|
|
||||||
then remove file
|
|
||||||
else Lwt_unix.unlink file
|
|
||||||
end)
|
|
||||||
files >>= fun () ->
|
|
||||||
Lwt_unix.rmdir dir in
|
|
||||||
if Sys.file_exists dir && Sys.is_directory dir then
|
|
||||||
remove dir
|
|
||||||
else
|
|
||||||
Lwt.return ()
|
|
||||||
|
|
||||||
let rec create_dir ?(perm = 0o755) dir =
|
|
||||||
Lwt_unix.file_exists dir >>= function
|
|
||||||
| false ->
|
|
||||||
create_dir (Filename.dirname dir) >>= fun () ->
|
|
||||||
Lwt_unix.mkdir dir perm
|
|
||||||
| true ->
|
|
||||||
Lwt_unix.stat dir >>= function
|
|
||||||
| { st_kind = S_DIR ; _ } -> Lwt.return_unit
|
|
||||||
| _ -> failwith "Not a directory"
|
|
||||||
|
|
||||||
let create_file ?(perm = 0o644) name content =
|
|
||||||
Lwt_unix.openfile name Unix.([O_TRUNC; O_CREAT; O_WRONLY]) perm >>= fun fd ->
|
|
||||||
Lwt_unix.write_string fd content 0 (String.length content) >>= fun _ ->
|
|
||||||
Lwt_unix.close fd
|
|
||||||
|
|
||||||
let safe_close fd =
|
|
||||||
Lwt.catch
|
|
||||||
(fun () -> Lwt_unix.close fd)
|
|
||||||
(fun _ -> Lwt.return_unit)
|
|
||||||
|
|
||||||
open Error_monad
|
|
||||||
|
|
||||||
type error += Canceled
|
|
||||||
|
|
||||||
let protect ?on_error ?canceler t =
|
|
||||||
let cancelation =
|
|
||||||
match canceler with
|
|
||||||
| None -> never_ending
|
|
||||||
| Some canceler ->
|
|
||||||
(Lwt_canceler.cancelation canceler >>= fun () ->
|
|
||||||
fail Canceled ) in
|
|
||||||
let res =
|
|
||||||
Lwt.pick [ cancelation ;
|
|
||||||
Lwt.catch t (fun exn -> fail (Exn exn)) ] in
|
|
||||||
res >>= function
|
|
||||||
| Ok _ -> res
|
|
||||||
| Error err ->
|
|
||||||
let canceled =
|
|
||||||
Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in
|
|
||||||
let err = if canceled then [Canceled] else err in
|
|
||||||
match on_error with
|
|
||||||
| None -> Lwt.return (Error err)
|
|
||||||
| Some on_error ->
|
|
||||||
Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn))
|
|
||||||
|
|
||||||
type error += Timeout
|
|
||||||
|
|
||||||
let () =
|
|
||||||
Error_monad.register_error_kind
|
|
||||||
`Temporary
|
|
||||||
~id:"utils.Timeout"
|
|
||||||
~title:"Timeout"
|
|
||||||
~description:"Timeout"
|
|
||||||
Data_encoding.unit
|
|
||||||
(function Timeout -> Some () | _ -> None)
|
|
||||||
(fun () -> Timeout)
|
|
||||||
|
|
||||||
let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f =
|
|
||||||
let timeout = Lwt_unix.sleep timeout in
|
|
||||||
let target = f canceler in
|
|
||||||
Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () ->
|
|
||||||
Lwt_unix.yield () >>= fun () ->
|
|
||||||
if Lwt.state target <> Lwt.Sleep then begin
|
|
||||||
Lwt.cancel timeout ;
|
|
||||||
target
|
|
||||||
end else begin
|
|
||||||
Lwt_canceler.cancel canceler >>= fun () ->
|
|
||||||
fail Timeout
|
|
||||||
end
|
|
||||||
|
|
||||||
let unless cond f =
|
let unless cond f =
|
||||||
if cond then Lwt.return () else f ()
|
if cond then Lwt.return () else f ()
|
||||||
|
|
||||||
let of_sockaddr = function
|
|
||||||
| Unix.ADDR_UNIX _ -> None
|
|
||||||
| Unix.ADDR_INET (addr, port) ->
|
|
||||||
match Ipaddr_unix.of_inet_addr addr with
|
|
||||||
| V4 addr -> Some (Ipaddr.v6_of_v4 addr, port)
|
|
||||||
| V6 addr -> Some (addr, port)
|
|
||||||
|
|
||||||
let getaddrinfo ~passive ~node ~service =
|
|
||||||
let open Lwt_unix in
|
|
||||||
getaddrinfo node service
|
|
||||||
( AI_SOCKTYPE SOCK_STREAM ::
|
|
||||||
(if passive then [AI_PASSIVE] else []) ) >>= fun addr ->
|
|
||||||
let points =
|
|
||||||
TzList.filter_map
|
|
||||||
(fun { ai_addr ; _ } -> of_sockaddr ai_addr)
|
|
||||||
addr in
|
|
||||||
Lwt.return points
|
|
||||||
|
@ -20,39 +20,6 @@ val worker:
|
|||||||
val trigger: unit -> (unit -> unit) * (unit -> unit Lwt.t)
|
val trigger: unit -> (unit -> unit) * (unit -> unit Lwt.t)
|
||||||
val sort: ('a -> 'a -> int Lwt.t) -> 'a list -> 'a list Lwt.t
|
val sort: ('a -> 'a -> int Lwt.t) -> 'a list -> 'a list Lwt.t
|
||||||
|
|
||||||
val read_bytes:
|
|
||||||
?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t
|
|
||||||
|
|
||||||
val read_mbytes:
|
|
||||||
?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t
|
|
||||||
|
|
||||||
val write_bytes:
|
|
||||||
?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t
|
|
||||||
val write_mbytes:
|
|
||||||
?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t
|
|
||||||
|
|
||||||
val remove_dir: string -> unit Lwt.t
|
|
||||||
val create_dir: ?perm:int -> string -> unit Lwt.t
|
|
||||||
val create_file: ?perm:int -> string -> string -> unit Lwt.t
|
|
||||||
|
|
||||||
val safe_close: Lwt_unix.file_descr -> unit Lwt.t
|
|
||||||
|
|
||||||
open Error_monad
|
|
||||||
|
|
||||||
type error += Canceled
|
|
||||||
val protect :
|
|
||||||
?on_error:(error list -> 'a tzresult Lwt.t) ->
|
|
||||||
?canceler:Lwt_canceler.t ->
|
|
||||||
(unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
|
|
||||||
|
|
||||||
type error += Timeout
|
|
||||||
val with_timeout:
|
|
||||||
?canceler:Lwt_canceler.t ->
|
|
||||||
float -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
|
|
||||||
|
|
||||||
val unless: bool -> (unit -> unit Lwt.t) -> unit Lwt.t
|
val unless: bool -> (unit -> unit Lwt.t) -> unit Lwt.t
|
||||||
|
|
||||||
val getaddrinfo:
|
|
||||||
passive:bool ->
|
|
||||||
node:string -> service:string ->
|
|
||||||
(Ipaddr.V6.t * int) list Lwt.t
|
|
||||||
|
168
src/lib_stdlib_lwt/lwt_utils_unix.ml
Normal file
168
src/lib_stdlib_lwt/lwt_utils_unix.ml
Normal file
@ -0,0 +1,168 @@
|
|||||||
|
(**************************************************************************)
|
||||||
|
(* *)
|
||||||
|
(* Copyright (c) 2014 - 2018. *)
|
||||||
|
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||||
|
(* *)
|
||||||
|
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||||
|
(* *)
|
||||||
|
(**************************************************************************)
|
||||||
|
|
||||||
|
open Lwt.Infix
|
||||||
|
|
||||||
|
let read_bytes ?(pos = 0) ?len fd buf =
|
||||||
|
let len = match len with None -> Bytes.length buf - pos | Some l -> l in
|
||||||
|
let rec inner pos len =
|
||||||
|
if len = 0 then
|
||||||
|
Lwt.return_unit
|
||||||
|
else
|
||||||
|
Lwt_unix.read fd buf pos len >>= function
|
||||||
|
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
||||||
|
| nb_read -> inner (pos + nb_read) (len - nb_read)
|
||||||
|
in
|
||||||
|
inner pos len
|
||||||
|
|
||||||
|
let read_mbytes ?(pos=0) ?len fd buf =
|
||||||
|
let len = match len with None -> MBytes.length buf - pos | Some l -> l in
|
||||||
|
let rec inner pos len =
|
||||||
|
if len = 0 then
|
||||||
|
Lwt.return_unit
|
||||||
|
else
|
||||||
|
Lwt_bytes.read fd buf pos len >>= function
|
||||||
|
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
||||||
|
| nb_read -> inner (pos + nb_read) (len - nb_read)
|
||||||
|
in
|
||||||
|
inner pos len
|
||||||
|
|
||||||
|
let write_mbytes ?(pos=0) ?len descr buf =
|
||||||
|
let len = match len with None -> MBytes.length buf - pos | Some l -> l in
|
||||||
|
let rec inner pos len =
|
||||||
|
if len = 0 then
|
||||||
|
Lwt.return_unit
|
||||||
|
else
|
||||||
|
Lwt_bytes.write descr buf pos len >>= function
|
||||||
|
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
||||||
|
| nb_written -> inner (pos + nb_written) (len - nb_written) in
|
||||||
|
inner pos len
|
||||||
|
|
||||||
|
let write_bytes ?(pos=0) ?len descr buf =
|
||||||
|
let len = match len with None -> Bytes.length buf - pos | Some l -> l in
|
||||||
|
let rec inner pos len =
|
||||||
|
if len = 0 then
|
||||||
|
Lwt.return_unit
|
||||||
|
else
|
||||||
|
Lwt_unix.write descr buf pos len >>= function
|
||||||
|
| 0 -> Lwt.fail End_of_file (* other endpoint cleanly closed its connection *)
|
||||||
|
| nb_written -> inner (pos + nb_written) (len - nb_written) in
|
||||||
|
inner pos len
|
||||||
|
|
||||||
|
let (>>=) = Lwt.bind
|
||||||
|
|
||||||
|
let remove_dir dir =
|
||||||
|
let rec remove dir =
|
||||||
|
let files = Lwt_unix.files_of_directory dir in
|
||||||
|
Lwt_stream.iter_s
|
||||||
|
(fun file ->
|
||||||
|
if file = "." || file = ".." then
|
||||||
|
Lwt.return ()
|
||||||
|
else begin
|
||||||
|
let file = Filename.concat dir file in
|
||||||
|
if Sys.is_directory file
|
||||||
|
then remove file
|
||||||
|
else Lwt_unix.unlink file
|
||||||
|
end)
|
||||||
|
files >>= fun () ->
|
||||||
|
Lwt_unix.rmdir dir in
|
||||||
|
if Sys.file_exists dir && Sys.is_directory dir then
|
||||||
|
remove dir
|
||||||
|
else
|
||||||
|
Lwt.return ()
|
||||||
|
|
||||||
|
let rec create_dir ?(perm = 0o755) dir =
|
||||||
|
Lwt_unix.file_exists dir >>= function
|
||||||
|
| false ->
|
||||||
|
create_dir (Filename.dirname dir) >>= fun () ->
|
||||||
|
Lwt_unix.mkdir dir perm
|
||||||
|
| true ->
|
||||||
|
Lwt_unix.stat dir >>= function
|
||||||
|
| { st_kind = S_DIR ; _ } -> Lwt.return_unit
|
||||||
|
| _ -> Pervasives.failwith "Not a directory"
|
||||||
|
|
||||||
|
let create_file ?(perm = 0o644) name content =
|
||||||
|
Lwt_unix.openfile name Unix.([O_TRUNC; O_CREAT; O_WRONLY]) perm >>= fun fd ->
|
||||||
|
Lwt_unix.write_string fd content 0 (String.length content) >>= fun _ ->
|
||||||
|
Lwt_unix.close fd
|
||||||
|
|
||||||
|
let safe_close fd =
|
||||||
|
Lwt.catch
|
||||||
|
(fun () -> Lwt_unix.close fd)
|
||||||
|
(fun _ -> Lwt.return_unit)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
let of_sockaddr = function
|
||||||
|
| Unix.ADDR_UNIX _ -> None
|
||||||
|
| Unix.ADDR_INET (addr, port) ->
|
||||||
|
match Ipaddr_unix.of_inet_addr addr with
|
||||||
|
| V4 addr -> Some (Ipaddr.v6_of_v4 addr, port)
|
||||||
|
| V6 addr -> Some (addr, port)
|
||||||
|
|
||||||
|
let getaddrinfo ~passive ~node ~service =
|
||||||
|
let open Lwt_unix in
|
||||||
|
getaddrinfo node service
|
||||||
|
( AI_SOCKTYPE SOCK_STREAM ::
|
||||||
|
(if passive then [AI_PASSIVE] else []) ) >>= fun addr ->
|
||||||
|
let points =
|
||||||
|
TzList.filter_map
|
||||||
|
(fun { ai_addr ; _ } -> of_sockaddr ai_addr)
|
||||||
|
addr in
|
||||||
|
Lwt.return points
|
||||||
|
|
||||||
|
open Error_monad
|
||||||
|
|
||||||
|
type error += Canceled
|
||||||
|
|
||||||
|
let protect ?on_error ?canceler t =
|
||||||
|
let cancelation =
|
||||||
|
match canceler with
|
||||||
|
| None -> Lwt_utils.never_ending
|
||||||
|
| Some canceler ->
|
||||||
|
(Lwt_canceler.cancelation canceler >>= fun () ->
|
||||||
|
fail Canceled ) in
|
||||||
|
let res =
|
||||||
|
Lwt.pick [ cancelation ;
|
||||||
|
Lwt.catch t (fun exn -> fail (Exn exn)) ] in
|
||||||
|
res >>= function
|
||||||
|
| Ok _ -> res
|
||||||
|
| Error err ->
|
||||||
|
let canceled =
|
||||||
|
Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in
|
||||||
|
let err = if canceled then [Canceled] else err in
|
||||||
|
match on_error with
|
||||||
|
| None -> Lwt.return (Error err)
|
||||||
|
| Some on_error ->
|
||||||
|
Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn))
|
||||||
|
|
||||||
|
type error += Timeout
|
||||||
|
|
||||||
|
let () =
|
||||||
|
register_error_kind
|
||||||
|
`Temporary
|
||||||
|
~id:"utils.Timeout"
|
||||||
|
~title:"Timeout"
|
||||||
|
~description:"Timeout"
|
||||||
|
Data_encoding.unit
|
||||||
|
(function Timeout -> Some () | _ -> None)
|
||||||
|
(fun () -> Timeout)
|
||||||
|
|
||||||
|
let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f =
|
||||||
|
let timeout = Lwt_unix.sleep timeout in
|
||||||
|
let target = f canceler in
|
||||||
|
Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () ->
|
||||||
|
Lwt_unix.yield () >>= fun () ->
|
||||||
|
if Lwt.state target <> Lwt.Sleep then begin
|
||||||
|
Lwt.cancel timeout ;
|
||||||
|
target
|
||||||
|
end else begin
|
||||||
|
Lwt_canceler.cancel canceler >>= fun () ->
|
||||||
|
fail Timeout
|
||||||
|
end
|
45
src/lib_stdlib_lwt/lwt_utils_unix.mli
Normal file
45
src/lib_stdlib_lwt/lwt_utils_unix.mli
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
(**************************************************************************)
|
||||||
|
(* *)
|
||||||
|
(* Copyright (c) 2014 - 2018. *)
|
||||||
|
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||||
|
(* *)
|
||||||
|
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||||
|
(* *)
|
||||||
|
(**************************************************************************)
|
||||||
|
|
||||||
|
val read_bytes:
|
||||||
|
?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t
|
||||||
|
|
||||||
|
val read_mbytes:
|
||||||
|
?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t
|
||||||
|
|
||||||
|
val write_bytes:
|
||||||
|
?pos:int -> ?len:int -> Lwt_unix.file_descr -> bytes -> unit Lwt.t
|
||||||
|
val write_mbytes:
|
||||||
|
?pos:int -> ?len:int -> Lwt_unix.file_descr -> MBytes.t -> unit Lwt.t
|
||||||
|
|
||||||
|
val remove_dir: string -> unit Lwt.t
|
||||||
|
val create_dir: ?perm:int -> string -> unit Lwt.t
|
||||||
|
val create_file: ?perm:int -> string -> string -> unit Lwt.t
|
||||||
|
|
||||||
|
val safe_close: Lwt_unix.file_descr -> unit Lwt.t
|
||||||
|
|
||||||
|
val getaddrinfo:
|
||||||
|
passive:bool ->
|
||||||
|
node:string -> service:string ->
|
||||||
|
(Ipaddr.V6.t * int) list Lwt.t
|
||||||
|
|
||||||
|
open Error_monad
|
||||||
|
|
||||||
|
type error += Canceled
|
||||||
|
|
||||||
|
val protect :
|
||||||
|
?on_error:(error list -> 'a tzresult Lwt.t) ->
|
||||||
|
?canceler:Lwt_canceler.t ->
|
||||||
|
(unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
|
||||||
|
|
||||||
|
type error += Timeout
|
||||||
|
val with_timeout:
|
||||||
|
?canceler:Lwt_canceler.t ->
|
||||||
|
float -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
|
||||||
|
|
Loading…
Reference in New Issue
Block a user