From 2716cbc1f10ff93fb0e1225fc19618f06ea97801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 19 Feb 2019 16:27:14 +0100 Subject: [PATCH] P2p: introduce `P2p_fd` --- src/lib_p2p/p2p_fd.ml | 107 +++++++++++++++++++ src/lib_p2p/p2p_fd.mli | 39 +++++++ src/lib_p2p/p2p_io_scheduler.ml | 122 ++++++++++------------ src/lib_p2p/p2p_io_scheduler.mli | 6 +- src/lib_p2p/p2p_pool.ml | 8 +- src/lib_p2p/p2p_pool.mli | 2 +- src/lib_p2p/p2p_welcome.ml | 2 +- src/lib_p2p/test/test_p2p_io_scheduler.ml | 13 ++- src/lib_p2p/test/test_p2p_socket.ml | 6 +- 9 files changed, 221 insertions(+), 84 deletions(-) create mode 100644 src/lib_p2p/p2p_fd.ml create mode 100644 src/lib_p2p/p2p_fd.mli diff --git a/src/lib_p2p/p2p_fd.ml b/src/lib_p2p/p2p_fd.ml new file mode 100644 index 000000000..980683695 --- /dev/null +++ b/src/lib_p2p/p2p_fd.ml @@ -0,0 +1,107 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* logging facility to monitor sockets *) + +let is_not_windows = Sys.os_type <> "Win32" +let () = + (* Otherwise some writes trigger a SIGPIPE instead of raising an + Lwt_unit exception. In the node, this is already done by + Cohttp, so this is only useful when using the P2P layer as a + stand alone library. *) + if is_not_windows then + Sys.(set_signal sigpipe Signal_ignore) + +(* Logging facility for the P2P layer *) +module Log = Logging.Make(struct let name = "p2p.fd" end) + +type t = { + fd : Lwt_unix.file_descr ; + id : int ; + mutable nread : int ; + mutable nwrit : int ; +} + +(* we use a prefix ' cnx:' that allows easy grepping in the log to lookup + everything related to a particular connection. *) +let log t fmt = + Format.kasprintf (fun s -> Log.log_info "cnx:%d:%s" t.id s) fmt + +let create = + let counter = ref 0 in + function fd -> + incr counter; + let t = { fd ; id = !counter ; nread = 0 ; nwrit = 0 } in + log t "create: fd %d" t.id ; + t + +let string_of_sockaddr addr = + match addr with + | Lwt_unix.ADDR_INET (ip, port) -> + Printf.sprintf "%s:%d" (Unix.string_of_inet_addr ip) port + | Lwt_unix.ADDR_UNIX file -> + Printf.sprintf "@%s" file + +let id t = t.id + +let socket proto kind arg = + create (Lwt_unix.socket proto kind arg) + +let close t = + log t "close: stats %d/%d" t.nread t.nwrit ; + Lwt_utils_unix.safe_close t.fd + +let read t buf pos len = + log t "try-read: %d" len; + Lwt_bytes.read t.fd buf pos len >>= fun nread -> + t.nread <- t.nread + nread ; + log t "read: %d (%d)" nread t.nread ; + Lwt.return nread + +let write t buf = + let len = MBytes.length buf in + log t "try-write: %d" len; + Lwt_utils_unix.write_mbytes t.fd buf >>= fun () -> + t.nwrit <- t.nwrit + len ; + log t "written: %d (%d)" len t.nwrit ; + Lwt.return () + +let connect t saddr = + log t "connect: %s" (string_of_sockaddr saddr); + Lwt_unix.connect t.fd saddr + +let accept sock = + Lwt_unix.accept sock >>= fun (fd, saddr) -> + let t = create fd in + log t "accept: %s" (string_of_sockaddr saddr); + Lwt.return (t, saddr) + +module Table = + Hashtbl.Make(struct + type nonrec t = t + let equal { id = x ; _ } { id = y ; _ } = x = y + let hash { id ; _ } = Hashtbl.hash id + end) + diff --git a/src/lib_p2p/p2p_fd.mli b/src/lib_p2p/p2p_fd.mli new file mode 100644 index 000000000..a8a3adb1c --- /dev/null +++ b/src/lib_p2p/p2p_fd.mli @@ -0,0 +1,39 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Bottom-up logging facility for the P2P layer. Use this to track + all information related to a particular connection. *) + +type t + +val id : t -> int +val read : t -> Lwt_bytes.t -> int -> int -> int Lwt.t +val close : t -> unit Lwt.t +val write : t -> MBytes.t -> unit Lwt.t +val socket : Lwt_unix.socket_domain -> Lwt_unix.socket_type -> int -> t +val connect : t -> Lwt_unix.sockaddr -> unit Lwt.t +val accept : Lwt_unix.file_descr -> (t * Lwt_unix.sockaddr) Lwt.t + +module Table : Hashtbl.S with type key = t diff --git a/src/lib_p2p/p2p_io_scheduler.ml b/src/lib_p2p/p2p_io_scheduler.ml index 023ebf91b..6dc8e50a4 100644 --- a/src/lib_p2p/p2p_io_scheduler.ml +++ b/src/lib_p2p/p2p_io_scheduler.ml @@ -25,22 +25,8 @@ (* TODO decide whether we need to preallocate buffers or not. *) -let () = - (* Otherwise some writes trigger a SIGPIPE instead of raising an - Lwt_unit exception. In the node, this is already done by - Cohttp, so this is only useful when using the P2P layer as a - stand alone library. *) - if Sys.os_type <> "Win32" then - Sys.(set_signal sigpipe Signal_ignore) - include Logging.Make (struct let name = "p2p.io-scheduler" end) -module Inttbl = Hashtbl.Make(struct - type t = int - let equal (x: int) (y: int) = x = y - let hash = Hashtbl.hash - end) - let alpha = 0.2 module type IO = sig @@ -241,12 +227,12 @@ end module ReadScheduler = Scheduler(struct let name = "io_scheduler(read)" - type in_param = Lwt_unix.file_descr * int + type in_param = P2p_fd.t * int let pop (fd, maxlen) = Lwt.catch (fun () -> let buf = MBytes.create maxlen in - Lwt_bytes.read fd buf 0 maxlen >>= fun len -> + P2p_fd.read fd buf 0 maxlen >>= fun len -> if len = 0 then fail P2p_errors.Connection_closed else @@ -274,11 +260,11 @@ module WriteScheduler = Scheduler(struct Lwt.catch (fun () -> Lwt_pipe.pop p >>= return) (fun _ -> fail (Exn Lwt_pipe.Closed)) - type out_param = Lwt_unix.file_descr + type out_param = P2p_fd.t let push fd buf = Lwt.catch (fun () -> - Lwt_utils_unix.write_mbytes fd buf >>= return) + P2p_fd.write fd buf >>= return) (function | Unix.Unix_error(Unix.ECONNRESET, _, _) | Unix.Unix_error(Unix.EPIPE, _, _) @@ -291,9 +277,8 @@ module WriteScheduler = Scheduler(struct end) type connection = { - id: int ; sched: t ; - conn: Lwt_unix.file_descr ; + conn: P2p_fd.t; canceler: Lwt_canceler.t ; read_conn: ReadScheduler.connection ; read_queue: MBytes.t tzresult Lwt_pipe.t ; @@ -304,7 +289,7 @@ type connection = { and t = { mutable closed: bool ; - connected: connection Inttbl.t ; + connected: connection P2p_fd.Table.t ; read_scheduler: ReadScheduler.t ; write_scheduler: WriteScheduler.t ; max_upload_speed: int option ; (* bytes per second. *) @@ -320,11 +305,11 @@ let reset_quota st = Moving_average.stat st.read_scheduler.counter and { Moving_average.average = current_outflow } = Moving_average.stat st.write_scheduler.counter in - let nb_conn = Inttbl.length st.connected in + let nb_conn = P2p_fd.Table.length st.connected in if nb_conn > 0 then begin let fair_read_quota = current_inflow / nb_conn and fair_write_quota = current_outflow / nb_conn in - Inttbl.iter + P2p_fd.Table.iter (fun _id conn -> conn.read_conn.last_quota <- fair_read_quota ; conn.read_conn.quota <- @@ -345,7 +330,7 @@ let create log_info "--> create" ; let st = { closed = false ; - connected = Inttbl.create 53 ; + connected = P2p_fd.Table.create 53 ; read_scheduler = ReadScheduler.create max_download_speed ; write_scheduler = WriteScheduler.create max_upload_speed ; max_upload_speed ; @@ -367,45 +352,43 @@ let read_size = function let write_size mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes + Lwt_pipe.push_overhead -let register = - let cpt = ref 0 in - fun st conn -> - if st.closed then begin - Lwt.async (fun () -> Lwt_utils_unix.safe_close conn) ; - raise Closed - end else begin - let id = incr cpt; !cpt in - let canceler = Lwt_canceler.create () in - let read_size = - Option.map st.read_queue_size ~f:(fun v -> v, read_size) in - let write_size = - Option.map st.write_queue_size ~f:(fun v -> v, write_size) in - let read_queue = Lwt_pipe.create ?size:read_size () in - let write_queue = Lwt_pipe.create ?size:write_size () in - let read_conn = - ReadScheduler.create_connection - st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id - and write_conn = - WriteScheduler.create_connection - st.write_scheduler write_queue conn canceler id in - Lwt_canceler.on_cancel canceler begin fun () -> - Inttbl.remove st.connected id ; - Moving_average.destroy read_conn.counter ; - Moving_average.destroy write_conn.counter ; - Lwt_pipe.close write_queue ; - Lwt_pipe.close read_queue ; - Lwt_utils_unix.safe_close conn - end ; - let conn = { - sched = st ; id ; conn ; canceler ; - read_queue ; read_conn ; - write_queue ; write_conn ; - partial_read = None ; - } in - Inttbl.add st.connected id conn ; - log_info "--> register (%d)" conn.id ; - conn - end +let register st conn = + if st.closed then begin + Lwt.async (fun () -> P2p_fd.close conn) ; + raise Closed + end else begin + let id = P2p_fd.id conn in + let canceler = Lwt_canceler.create () in + let read_size = + Option.map st.read_queue_size ~f:(fun v -> v, read_size) in + let write_size = + Option.map st.write_queue_size ~f:(fun v -> v, write_size) in + let read_queue = Lwt_pipe.create ?size:read_size () in + let write_queue = Lwt_pipe.create ?size:write_size () in + let read_conn = + ReadScheduler.create_connection + st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id + and write_conn = + WriteScheduler.create_connection + st.write_scheduler write_queue conn canceler id in + Lwt_canceler.on_cancel canceler begin fun () -> + P2p_fd.Table.remove st.connected conn ; + Moving_average.destroy read_conn.counter ; + Moving_average.destroy write_conn.counter ; + Lwt_pipe.close write_queue ; + Lwt_pipe.close read_queue ; + P2p_fd.close conn + end ; + let conn = { + sched = st ; conn ; canceler ; + read_queue ; read_conn ; + write_queue ; write_conn ; + partial_read = None ; + } in + P2p_fd.Table.add st.connected conn.conn conn ; + log_info "--> register (%d)" id ; + conn + end let write { write_queue } msg = Lwt.catch @@ -487,8 +470,9 @@ let stat { read_conn ; write_conn} = convert ~rs ~ws let close ?timeout conn = - lwt_log_info "--> close (%d)" conn.id >>= fun () -> - Inttbl.remove conn.sched.connected conn.id ; + let id = P2p_fd.id conn.conn in + lwt_log_info "--> close (%d)" id >>= fun () -> + P2p_fd.Table.remove conn.sched.connected conn.conn ; Lwt_pipe.close conn.write_queue ; begin match timeout with @@ -501,20 +485,22 @@ let close ?timeout conn = (fun canceler -> return (Lwt_canceler.cancelation canceler)) end >>=? fun _ -> conn.write_conn.current_push >>= fun res -> - lwt_log_info "<-- close (%d)" conn.id >>= fun () -> + lwt_log_info "<-- close (%d)" id >>= fun () -> Lwt.return res let iter_connection { connected } f = - Inttbl.iter f connected + P2p_fd.Table.iter (fun _ conn -> f conn) connected let shutdown ?timeout st = lwt_log_info "--> shutdown" >>= fun () -> st.closed <- true ; ReadScheduler.shutdown st.read_scheduler >>= fun () -> - Inttbl.fold + P2p_fd.Table.fold (fun _peer_id conn acc -> close ?timeout conn >>= fun _ -> acc) st.connected Lwt.return_unit >>= fun () -> WriteScheduler.shutdown st.write_scheduler >>= fun () -> lwt_log_info "<-- shutdown" >>= fun () -> Lwt.return_unit + +let id conn = P2p_fd.id conn.conn diff --git a/src/lib_p2p/p2p_io_scheduler.mli b/src/lib_p2p/p2p_io_scheduler.mli index 3ee1a1dfa..7235056f9 100644 --- a/src/lib_p2p/p2p_io_scheduler.mli +++ b/src/lib_p2p/p2p_io_scheduler.mli @@ -58,7 +58,7 @@ val create: max upload (resp. download) speed, and specified read (resp. write) queue sizes (in bytes) for connections. *) -val register: t -> Lwt_unix.file_descr -> connection +val register: t -> P2p_fd.t -> connection (** [register sched fd] is a [connection] managed by [sched]. *) val write: connection -> MBytes.t -> unit tzresult Lwt.t @@ -92,7 +92,7 @@ val global_stat: t -> P2p_stat.t (** [global_stat sched] is a snapshot of [sched]'s bandwidth usage (sum of [stat conn] for each [conn] in [sched]). *) -val iter_connection: t -> (int -> connection -> unit) -> unit +val iter_connection: t -> (connection -> unit) -> unit (** [iter_connection sched f] applies [f] on each connection managed by [sched]. *) @@ -104,3 +104,5 @@ val shutdown: ?timeout:float -> t -> unit Lwt.t (** [shutdown sched] returns after all connections managed by [sched] have been closed and [sched]'s inner worker has successfully canceled. *) + +val id : connection -> int diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index ef5ea25ee..686366e9a 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -728,18 +728,18 @@ let rec connect ?timeout pool point = P2p_errors.Private_mode >>=? fun () -> fail_unless_disconnected_point point_info >>=? fun () -> P2p_point_state.set_requested point_info canceler ; - let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () -> protect ~canceler begin fun () -> log pool (Outgoing_connection point) ; - Lwt_unix.connect fd uaddr >>= fun () -> + P2p_fd.connect fd uaddr >>= fun () -> return_unit end ~on_error: begin fun err -> lwt_debug "connect: %a -> disconnect" P2p_point.Id.pp point >>= fun () -> P2p_point_state.set_disconnected point_info ; - Lwt_utils_unix.safe_close fd >>= fun () -> + P2p_fd.close fd >>= fun () -> match err with | [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] -> fail P2p_errors.Connection_refused @@ -1127,7 +1127,7 @@ let accept pool fd point = || pool.config.max_connections <= active_connections pool (* silently ignore banned points *) || (P2p_acl.banned_addr pool.acl (fst point)) then - Lwt.async (fun () -> Lwt_utils_unix.safe_close fd) + Lwt.async (fun () -> P2p_fd.close fd) else let canceler = Lwt_canceler.create () in P2p_point.Table.add pool.incoming point canceler ; diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index bf6b9cad8..b930d743e 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -224,7 +224,7 @@ val connect: in [pool] in less than [timeout] seconds. *) val accept: - ('msg, 'peer_meta,'conn_meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit + ('msg, 'peer_meta,'conn_meta) pool -> P2p_fd.t -> P2p_point.Id.t -> unit (** [accept pool fd point] instructs [pool] to start the process of accepting a connection from [fd]. Used by [P2p]. *) diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index 9a8e0b06b..bacaa6c30 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -38,7 +38,7 @@ let rec worker_loop st = let Pool pool = st.pool in Lwt_unix.yield () >>= fun () -> protect ~canceler:st.canceler begin fun () -> - Lwt_unix.accept st.socket >>= return + P2p_fd.accept st.socket >>= return end >>= function | Ok (fd, addr) -> let point = diff --git a/src/lib_p2p/test/test_p2p_io_scheduler.ml b/src/lib_p2p/test/test_p2p_io_scheduler.ml index bb5d98af3..cb7a68756 100644 --- a/src/lib_p2p/test/test_p2p_io_scheduler.ml +++ b/src/lib_p2p/test/test_p2p_io_scheduler.ml @@ -48,7 +48,7 @@ let rec listen ?port addr = end let accept main_socket = - Lwt_unix.accept main_socket >>= fun (fd, _sockaddr) -> + P2p_fd.accept main_socket >>= fun (fd, _sockaddr) -> return fd let rec accept_n main_socket n = @@ -60,10 +60,10 @@ let rec accept_n main_socket n = return (conn :: acc) let connect addr port = - let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in - Lwt_unix.connect fd uaddr >>= fun () -> + P2p_fd.connect fd uaddr >>= fun () -> return fd let simple_msgs = @@ -107,8 +107,11 @@ let server log_notice "Stat: %a" P2p_stat.pp (P2p_io_scheduler.global_stat sched) ; if display_client_stat then P2p_io_scheduler.iter_connection sched - (fun id conn -> - log_notice " client(%d) %a" id P2p_stat.pp (P2p_io_scheduler.stat conn)) ; + (fun conn -> + log_notice + " client(%d) %a" + (P2p_io_scheduler.id conn) + P2p_stat.pp (P2p_io_scheduler.stat conn)) ; end ; (* Accept and read message until the connection is closed. *) accept_n main_socket n >>=? fun conns -> diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index 24cbb56d6..6fb8fa9a2 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -105,7 +105,7 @@ let run_nodes client server = Process.wait_all nodes let raw_accept sched main_socket = - Lwt_unix.accept main_socket >>= fun (fd, sockaddr) -> + P2p_fd.accept main_socket >>= fun (fd, sockaddr) -> let fd = P2p_io_scheduler.register sched fd in let point = match sockaddr with @@ -122,10 +122,10 @@ let accept sched main_socket = conn_meta_config let raw_connect sched addr port = - let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in - Lwt_unix.connect fd uaddr >>= fun () -> + P2p_fd.connect fd uaddr >>= fun () -> let fd = P2p_io_scheduler.register sched fd in Lwt.return fd