diff --git a/.gitignore b/.gitignore index dc5373a3c..199bb28a1 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ /test/test-basic /test/test-data-encoding /test/test-p2p-io-scheduler +/test/test-p2p-connection /test/LOG *~ diff --git a/src/Makefile b/src/Makefile index 6d4ee8cfa..2e8d89a92 100644 --- a/src/Makefile +++ b/src/Makefile @@ -259,6 +259,7 @@ NODE_LIB_INTFS := \ \ node/net/p2p_types.mli \ node/net/p2p_io_scheduler.mli \ + node/net/p2p_connection.mli \ node/net/p2p.mli \ node/net/RPC_server.mli \ \ @@ -291,6 +292,7 @@ NODE_LIB_IMPLS := \ \ node/net/p2p_types.ml \ node/net/p2p_io_scheduler.ml \ + node/net/p2p_connection.ml \ node/net/p2p.ml \ \ node/net/RPC_server.ml \ diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml new file mode 100644 index 000000000..07fbc762a --- /dev/null +++ b/src/node/net/p2p_connection.ml @@ -0,0 +1,410 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* TODO encode/encrypt before to push into the writer pipe. *) +(* TODO patch Sodium.Box to avoid allocation of the encrypted buffer.*) +(* TODO patch Data_encoding for continuation-based binary writer/reader. *) +(* TODO use queue bound by memory size of its elements, not by the + number of elements. *) +(* TODO test `close ~wait:true`. *) +(* TODO nothing in welcoming message proves that the incoming peer is + the owner of the public key... only the first message will + really proves it. Should this to be changed? Not really + important, but... an attacker might forge a random public key + with enough proof of work (hard task), open a connection, wait + infinitly. This would avoid the real peer to talk with us. And + this might also have an influence on its "score". *) + +open P2p_types + +include Logging.Make(struct let name = "p2p.connection" end) + +type error += Decipher_error +type error += Invalid_message_size +type error += Encoding_error +type error += Rejected +type error += Decoding_error +type error += Myself of Id_point.t +type error += Not_enough_proof_of_work of Gid.t + +type cryptobox_data = { + channel_key : Crypto_box.channel_key ; + mutable local_nonce : Crypto_box.nonce ; + mutable remote_nonce : Crypto_box.nonce ; +} + +let header_length = 2 +let crypto_overhead = 18 (* FIXME import from Sodium.Box. *) +let max_content_length = + 1 lsl (header_length * 8) - crypto_overhead + +module Connection_message = struct + + type t = { + port : int option ; + versions : Version.t list ; + public_key : Crypto_box.public_key ; + proof_of_work_stamp : Crypto_box.nonce ; + message_nonce : Crypto_box.nonce ; + } + + let encoding = + let open Data_encoding in + conv + (fun { port ; public_key ; proof_of_work_stamp ; + message_nonce ; versions } -> + let port = match port with None -> 0 | Some port -> port in + (port, public_key, proof_of_work_stamp, + message_nonce, versions)) + (fun (port, public_key, proof_of_work_stamp, + message_nonce, versions) -> + let port = if port = 0 then None else Some port in + { port ; public_key ; proof_of_work_stamp ; + message_nonce ; versions }) + (obj5 + (req "port" uint16) + (req "pubkey" Crypto_box.public_key_encoding) + (req "proof_of_work_stamp" Crypto_box.nonce_encoding) + (req "message_nonce" Crypto_box.nonce_encoding) + (req "versions" (Variable.list Version.encoding))) + + let write fd message = + let encoded_message_len = + Data_encoding.Binary.length encoding message in + fail_unless + (encoded_message_len < max_content_length) + Encoding_error >>=? fun () -> + let len = header_length + encoded_message_len in + let buf = MBytes.create len in + match Data_encoding.Binary.write encoding message buf header_length with + | None -> + fail Encoding_error + | Some last -> + fail_unless (last = len) Encoding_error >>=? fun () -> + MBytes.set_int16 buf 0 encoded_message_len ; + P2p_io_scheduler.write fd buf + + let read fd = + let header_buf = MBytes.create header_length in + P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () -> + let len = MBytes.get_uint16 header_buf 0 in + let buf = MBytes.create len in + P2p_io_scheduler.read_full ~len fd buf >>=? fun () -> + match Data_encoding.Binary.read encoding buf 0 len with + | None -> + fail Decoding_error + | Some (read_len, message) -> + if read_len <> len then + fail Decoding_error + else + return message + +end + +module Ack = struct + + type t = bool + let ack = MBytes.of_string "\255" + let nack = MBytes.of_string "\000" + + let write fd b = + match b with + | true -> + P2p_io_scheduler.write fd ack + | false -> + P2p_io_scheduler.write fd nack + + let read fd = + let buf = MBytes.create 1 in + P2p_io_scheduler.read_full fd buf >>=? fun () -> + return (buf <> nack) + +end + +type authenticated_fd = + P2p_io_scheduler.connection * Connection_info.t * cryptobox_data + +let kick (fd, _ , _) = + Ack.write fd false >>= fun _ -> + P2p_io_scheduler.close fd >>= fun _ -> + Lwt.return_unit + +(* First step: write and read credentials, makes no difference + whether we're trying to connect to a peer or checking an incoming + connection, both parties must first introduce themselves. *) +let authenticate + ~proof_of_work_target + ~incoming fd (remote_addr, remote_socket_port as point) + ?listening_port identity supported_versions = + let local_nonce = Crypto_box.random_nonce () in + lwt_debug "Sending authenfication to %a" Point.pp point >>= fun () -> + Connection_message.write fd + { public_key = identity.Identity.public_key ; + proof_of_work_stamp = identity.proof_of_work_stamp ; + message_nonce = local_nonce ; + port = listening_port ; + versions = supported_versions } >>=? fun () -> + Connection_message.read fd >>=? fun msg -> + let remote_listening_port = + if incoming then msg.port else Some remote_socket_port in + let id_point = remote_addr, remote_listening_port in + let remote_gid = Crypto_box.hash msg.public_key in + fail_unless + (remote_gid <> identity.Identity.gid) + (Myself id_point) >>=? fun () -> + fail_unless + (Crypto_box.check_proof_of_work + msg.public_key msg.proof_of_work_stamp proof_of_work_target) + (Not_enough_proof_of_work remote_gid) >>=? fun () -> + let channel_key = + Crypto_box.precompute identity.Identity.secret_key msg.public_key in + let info = + { Connection_info.gid = remote_gid ; versions = msg.versions ; incoming ; + id_point ; remote_socket_port ;} in + let cryptobox_data = + { channel_key ; local_nonce ; + remote_nonce = msg.message_nonce } in + return (info, (fd, info, cryptobox_data)) + +type connection = { + info : Connection_info.t ; + fd : P2p_io_scheduler.connection ; + cryptobox_data : cryptobox_data ; +} + +module Reader = struct + + type 'msg t = { + canceler: Canceler.t ; + conn: connection ; + encoding: 'msg Data_encoding.t ; + messages: 'msg tzresult Lwt_pipe.t ; + mutable worker: unit Lwt.t ; + } + + let read_chunk { fd ; cryptobox_data } = + let header_buf = MBytes.create header_length in + P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () -> + let len = MBytes.get_uint16 header_buf 0 in + let buf = MBytes.create len in + P2p_io_scheduler.read_full ~len fd buf >>=? fun () -> + let remote_nonce = cryptobox_data.remote_nonce in + cryptobox_data.remote_nonce <- Crypto_box.increment_nonce remote_nonce ; + match + Crypto_box.fast_box_open cryptobox_data.channel_key buf remote_nonce + with + | None -> + fail Decipher_error + | Some buf -> + return buf + + let rec read_message st buf = + return (Data_encoding.Binary.of_bytes st.encoding buf) + + let rec worker_loop st = + Lwt_unix.yield () >>= fun () -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + read_chunk st.conn >>=? fun buf -> + read_message st buf + end >>= function + | Ok None -> + Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> + worker_loop st + | Ok (Some msg) -> + Lwt_pipe.push st.messages (Ok msg) >>= fun () -> + worker_loop st + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + Lwt.return_unit + | Error _ as err -> + Lwt_pipe.push st.messages err >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + + let run ?size conn encoding canceler = + let st = + { canceler ; conn ; encoding ; + messages = Lwt_pipe.create ?size () ; + worker = Lwt.return_unit ; + } in + Canceler.on_cancel st.canceler begin fun () -> + Lwt_pipe.close st.messages ; + Lwt.return_unit + end ; + st.worker <- + Lwt_utils.worker "reader" + (fun () -> worker_loop st) + (fun () -> Canceler.cancel st.canceler) ; + st + + let shutdown st = + Canceler.cancel st.canceler >>= fun () -> + st.worker + +end + +module Writer = struct + + type 'msg t = { + canceler: Canceler.t ; + conn: connection ; + encoding: 'msg Data_encoding.t ; + messages: ('msg * unit tzresult Lwt.u option) Lwt_pipe.t ; + mutable worker: unit Lwt.t ; + } + + let write_chunk { cryptobox_data ; fd } buf = + let header_buf = MBytes.create header_length in + let local_nonce = cryptobox_data.local_nonce in + cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ; + let encrypted_message = + Crypto_box.fast_box cryptobox_data.channel_key buf local_nonce in + let encrypted_len = MBytes.length encrypted_message in + fail_unless + (encrypted_len < max_content_length) + Invalid_message_size >>=? fun () -> + MBytes.set_int16 header_buf 0 encrypted_len ; + P2p_io_scheduler.write fd header_buf >>=? fun () -> + P2p_io_scheduler.write fd encrypted_message >>=? fun () -> + return () + + let encode_message st msg = + try return (Data_encoding.Binary.to_bytes st.encoding msg) + with _ -> fail Encoding_error + + let rec worker_loop st = + Lwt_unix.yield () >>= fun () -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_pipe.pop st.messages >>= fun (msg, wakener) -> + encode_message st msg >>=? fun buf -> + write_chunk st.conn buf >>= fun res -> + iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; + Lwt.return res + end >>= function + | Ok () -> + worker_loop st + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + Lwt.return_unit + | Error err -> + lwt_log_error + "@[Error while writing to %a@ %a@]" + Connection_info.pp st.conn.info pp_print_error err >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + + let run ?size conn encoding canceler = + let st = + { canceler ; conn ; encoding ; + messages = Lwt_pipe.create ?size () ; + worker = Lwt.return_unit ; + } in + Canceler.on_cancel st.canceler begin fun () -> + Lwt_pipe.close st.messages ; + Lwt.return_unit + end ; + st.worker <- + Lwt_utils.worker "writer" + (fun () -> worker_loop st) + (fun () -> Canceler.cancel st.canceler) ; + st + + let shutdown st = + Canceler.cancel st.canceler >>= fun () -> + st.worker + +end + +type 'msg t = { + conn : connection ; + reader : 'msg Reader.t ; + writer : 'msg Writer.t ; +} + +let pp ppf { conn } = Connection_info.pp ppf conn.info +let info { conn } = conn.info + +let accept + ?incoming_message_queue_size ?outgoing_message_queue_size + (fd, info, cryptobox_data) encoding = + Lwt_utils.protect begin fun () -> + Ack.write fd true >>=? fun () -> + Ack.read fd + end ~on_error:begin fun err -> + P2p_io_scheduler.close fd >>= fun _ -> + Lwt.return (Error err) + end >>=? fun accepted -> + fail_unless accepted Rejected >>=? fun () -> + let canceler = Canceler.create () in + let conn = { fd ; info ; cryptobox_data } in + let reader = + Reader.run ?size:incoming_message_queue_size conn encoding canceler + and writer = + Writer.run ?size:outgoing_message_queue_size conn encoding canceler in + let conn = { conn ; reader ; writer } in + Canceler.on_cancel canceler begin fun () -> + P2p_io_scheduler.close fd >>= fun _ -> + Lwt.return_unit + end ; + return conn + +exception Not_available +exception Connection_closed + +let catch_closed_pipe f = + Lwt.catch f begin function + | Lwt_pipe.Closed -> fail P2p_io_scheduler.Connection_closed + | exn -> fail (Exn exn) + end + +let is_writable { writer } = + not (Lwt_pipe.is_full writer.messages) +let wait_writable { writer } = + Lwt_pipe.not_full writer.messages +let write { writer } msg = + catch_closed_pipe begin fun () -> + Lwt_pipe.push writer.messages (msg, None) >>= return + end +let write_sync { writer } msg = + catch_closed_pipe begin fun () -> + let waiter, wakener = Lwt.wait () in + Lwt_pipe.push writer.messages (msg, Some wakener) >>= fun () -> + waiter + end +let write_now { writer } msg = + try Ok (Lwt_pipe.push_now writer.messages (msg, None)) + with Lwt_pipe.Closed -> Error [P2p_io_scheduler.Connection_closed] + +let is_readable { reader } = + not (Lwt_pipe.is_empty reader.messages) +let wait_readable { reader } = + catch_closed_pipe begin fun () -> + Lwt_pipe.values_available reader.messages >>= return + end +let read { reader } = + catch_closed_pipe begin fun () -> + Lwt_pipe.pop reader.messages + end +let read_now { reader } = + try Lwt_pipe.pop_now reader.messages + with Lwt_pipe.Closed -> Some (Error [P2p_io_scheduler.Connection_closed]) + +let stat { conn = { fd } } = P2p_io_scheduler.stat fd + +let close ?(wait = false) st = + begin + if not wait then Lwt.return_unit + else begin + Lwt_pipe.close st.reader.messages ; + Lwt_pipe.close st.writer.messages ; + st.writer.worker + end + end >>= fun () -> + Reader.shutdown st.reader >>= fun () -> + Writer.shutdown st.writer >>= fun () -> + P2p_io_scheduler.close st.conn.fd >>= fun _ -> + Lwt.return_unit diff --git a/src/node/net/p2p_connection.mli b/src/node/net/p2p_connection.mli new file mode 100644 index 000000000..8d335a68c --- /dev/null +++ b/src/node/net/p2p_connection.mli @@ -0,0 +1,119 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** This modules adds message encoding and encryption to + [P2p_io_scheduler]'s generic throttled connections. + + Each connection have an associated internal read (resp. write) + queue containing messages (of type ['msg]), whose size can be + limited by providing corresponding arguments to [accept]. +*) + +open P2p_types + +(** {1 Types} *) + +type error += Decipher_error +type error += Invalid_message_size +type error += Encoding_error +type error += Decoding_error +type error += Rejected +type error += Myself of Id_point.t +type error += Not_enough_proof_of_work of Gid.t + +type authenticated_fd +(** Type of a connection that successfully passed the authentication + phase, but has not been accepted yet. *) + +type 'msg t +(** Type of an accepted connection, parametrized by the type of + messages exchanged between peers. *) + +val pp : Format.formatter -> 'msg t -> unit +val info: 'msg t -> Connection_info.t + +(** {1 Low-level functions (do not use directly)} *) + +val authenticate: + proof_of_work_target:Crypto_box.target -> + incoming:bool -> + P2p_io_scheduler.connection -> Point.t -> + ?listening_port: int -> + Identity.t -> Version.t list -> + (Connection_info.t * authenticated_fd) tzresult Lwt.t +(** (Low-level) (Cancelable) Authentication function of a remote + peer. Used in [P2p_connection_pool], to promote a + [P2P_io_scheduler.connection] into an [authenticated_fd] (auth + correct, acceptation undecided). *) + +val kick: authenticated_fd -> unit Lwt.t +(** (Low-level) (Cancelable) [kick afd] notifies the remote peer that + we refuse this connection and then closes [afd]. Used in + [P2p_connection_pool] to reject an [aunthenticated_fd] which we do + not want to connect to for some reason. *) + +val accept: + ?incoming_message_queue_size:int -> + ?outgoing_message_queue_size:int -> + authenticated_fd -> 'msg Data_encoding.t -> 'msg t tzresult Lwt.t +(** (Low-level) (Cancelable) Accepts a remote peer given an + authenticated_fd. Used in [P2p_connection_pool], to promote an + [authenticated_fd] to the status of an active peer. *) + +(** {1 IO functions on connections} *) + +(** {2 Output functions} *) + +val is_writable: 'msg t -> bool +(** [is_writable conn] is [true] iff [conn] internal write queue is + not full. *) + +val wait_writable: 'msg t -> unit Lwt.t +(** (Cancelable) [wait_writable conn] returns when [conn]'s internal + write queue becomes writable (i.e. not full). *) + +val write: 'msg t -> 'msg -> unit tzresult Lwt.t +(** [write conn msg] returns when [msg] has successfully been added to + [conn]'s internal write queue or fails with a corresponding + error. *) + +val write_now: 'msg t -> 'msg -> bool tzresult +(** [write_now conn msg] is [Ok true] if [msg] has been added to + [conn]'s internal write queue, [Ok false] if [msg] has been + dropped, or fails with a correponding error otherwise. *) + +val write_sync: 'msg t -> 'msg -> unit tzresult Lwt.t +(** [write_sync conn msg] returns when [msg] has been successfully + sent to the remote end of [conn], or fails accordingly. *) + +(** {2 Input functions} *) + +val is_readable: 'msg t -> bool +(** [is_readable conn] is [true] iff [conn] internal read queue is not + empty. *) + +val wait_readable: 'msg t -> unit tzresult Lwt.t +(** (Cancelable) [wait_readable conn] returns when [conn]'s internal + read queue becomes readable (i.e. not empty). *) + +val read: 'msg t -> 'msg tzresult Lwt.t +(** [read conn msg] returns when [msg] has successfully been popped + from [conn]'s internal read queue or fails with a corresponding + error. *) + +val read_now: 'msg t -> 'msg tzresult option +(** [read_now conn msg] is [Some msg] if [conn]'s internal read queue + is not empty, [None] if it is empty, or fails with a correponding + error otherwise. *) + +val stat: 'msg t -> Stat.t +(** [stat conn] is a snapshot of current bandwidth usage for + [conn]. *) + +val close: ?wait:bool -> 'msg t -> unit Lwt.t diff --git a/src/node/net/p2p_types.ml b/src/node/net/p2p_types.ml index 846ba75a3..5ed7ded49 100644 --- a/src/node/net/p2p_types.ml +++ b/src/node/net/p2p_types.ml @@ -11,6 +11,37 @@ open Logging.Net module Canceler = Lwt_utils.Canceler +module Version = struct + type t = { + name : string ; + major : int ; + minor : int ; + } + + let encoding = + let open Data_encoding in + conv + (fun { name; major; minor } -> (name, major, minor)) + (fun (name, major, minor) -> { name; major; minor }) + (obj3 + (req "name" string) + (req "major" int8) + (req "minor" int8)) + + (* the common version for a pair of peers, if any, is the maximum one, + in lexicographic order *) + let common la lb = + let la = List.sort (fun l r -> compare r l) la in + let lb = List.sort (fun l r -> compare r l) lb in + let rec find = function + | [], _ | _, [] -> None + | ((a :: ta) as la), ((b :: tb) as lb) -> + if a = b then Some a + else if a < b then find (ta, lb) + else find (la, tb) + in find (la, lb) +end + module Stat = struct type t = { @@ -44,3 +75,151 @@ module Gid = struct module Set = Set.Make (Crypto_box.Public_key_hash) module Table = Hash.Hash_table (Crypto_box.Public_key_hash) end + +(* public types *) +type addr = Ipaddr.V6.t +type port = int + +module Point = struct + + module T = struct + (* A net point (address x port). *) + type t = addr * port + let compare (a1, p1) (a2, p2) = + match Ipaddr.V6.compare a1 a2 with + | 0 -> p1 - p2 + | x -> x + let equal p1 p2 = compare p1 p2 = 0 + let hash = Hashtbl.hash + let pp ppf (addr, port) = + Format.fprintf ppf "[%a]:%d" Ipaddr.V6.pp_hum addr port + let pp_opt ppf = function + | None -> Format.pp_print_string ppf "none" + | Some point -> pp ppf point + + let is_local (addr, _) = Ipaddr.V6.is_private addr + let is_global (addr, _) = not @@ Ipaddr.V6.is_private addr + + let to_sockaddr (addr, port) = Unix.(ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port)) + + let encoding = + let open Data_encoding in + conv + (fun (addr, port) -> Ipaddr.V6.to_string addr, port) + (fun (addr, port) -> Ipaddr.V6.of_string_exn addr, port) + (obj2 + (req "addr" string) + (req "port" int16)) + end + + include T + + (* Run-time point-or-gid indexed storage, one point is bound to at + most one gid, which is the invariant we want to keep both for the + connected peers table and the known peers one *) + + module Map = Map.Make (T) + module Set = Set.Make (T) + module Table = Hashtbl.Make (T) + +end + +module Id_point = struct + + module T = struct + (* A net point (address x port). *) + type t = addr * port option + let empty = Ipaddr.V6.unspecified, None + let compare (a1, p1) (a2, p2) = + match Ipaddr.V6.compare a1 a2 with + | 0 -> Pervasives.compare p1 p2 + | x -> x + let equal p1 p2 = compare p1 p2 = 0 + let hash = Hashtbl.hash + let pp ppf (addr, port) = + match port with + | None -> + Format.fprintf ppf "[%a]:??" Ipaddr.V6.pp_hum addr + | Some port -> + Format.fprintf ppf "[%a]:%d" Ipaddr.V6.pp_hum addr port + let pp_opt ppf = function + | None -> Format.pp_print_string ppf "none" + | Some point -> pp ppf point + + let is_local (addr, _) = Ipaddr.V6.is_private addr + let is_global (addr, _) = not @@ Ipaddr.V6.is_private addr + + let encoding = + let open Data_encoding in + conv + (fun (addr, port) -> Ipaddr.V6.to_bytes addr, port) + (fun (addr, port) -> Ipaddr.V6.of_bytes_exn addr, port) + (obj2 + (req "addr" string) + (opt "port" int16)) + end + + include T + + (* Run-time point-or-gid indexed storage, one point is bound to at + most one gid, which is the invariant we want to keep both for the + connected peers table and the known peers one *) + + module Map = Map.Make (T) + module Set = Set.Make (T) + module Table = Hashtbl.Make (T) + +end + +module Identity = struct + + type t = { + gid : Gid.t ; + public_key : Crypto_box.public_key ; + secret_key : Crypto_box.secret_key ; + proof_of_work_stamp : Crypto_box.nonce ; + } + + let encoding = + let open Data_encoding in + conv + (fun { public_key ; secret_key ; proof_of_work_stamp } -> + (public_key, secret_key, proof_of_work_stamp)) + (fun (public_key, secret_key, proof_of_work_stamp) -> + let gid = Crypto_box.hash public_key in + { gid ; public_key ; secret_key ; proof_of_work_stamp }) + (obj3 + (req "public_key" Crypto_box.public_key_encoding) + (req "secret_key" Crypto_box.secret_key_encoding) + (req "proof_of_work_stamp" Crypto_box.nonce_encoding)) + + let generate target = + let secret_key, public_key, gid = Crypto_box.random_keypair () in + let proof_of_work_stamp = + Crypto_box.generate_proof_of_work public_key target in + { gid ; public_key ; secret_key ; proof_of_work_stamp } + +end + +module Connection_info = struct + + type t = { + incoming : bool; + gid : Gid.t; + id_point : Id_point.t; + remote_socket_port : port; + versions : Version.t list ; + } + + let pp ppf + { incoming ; id_point = (remote_addr, remote_port) ; gid } = + Format.fprintf ppf "%a:%a {%a}%s" + Ipaddr.V6.pp_hum remote_addr + (fun ppf port -> + match port with + | None -> Format.pp_print_string ppf "??" + | Some port -> Format.pp_print_int ppf port) remote_port + Gid.pp gid + (if incoming then " (incoming)" else "") + +end diff --git a/src/node/net/p2p_types.mli b/src/node/net/p2p_types.mli index 27e596452..f85ed323a 100644 --- a/src/node/net/p2p_types.mli +++ b/src/node/net/p2p_types.mli @@ -9,6 +9,93 @@ module Canceler = Lwt_utils.Canceler +(** Protocol version *) + +module Version : sig + type t = { + name : string ; + major : int ; + minor : int ; + } + (** Type of a protocol version. *) + + val encoding : t Data_encoding.t + val common: t list -> t list -> t option +end + + +(** Gid, i.e. persistent peer identifier *) + +module Gid : sig + type t = Crypto_box.Public_key_hash.t + (** Type of a gid, a public key hash. *) + + val compare : t -> t -> int + val equal : t -> t -> bool + val pp : Format.formatter -> t -> unit + val encoding : t Data_encoding.t + module Map : Map.S with type key = t + module Set : Set.S with type elt = t + module Table : Hashtbl.S with type key = t +end + +type addr = Ipaddr.V6.t +type port = int + + +(** Point, i.e. socket address *) + +module Point : sig + type t = addr * port + val compare : t -> t -> int + val pp : Format.formatter -> t -> unit + val pp_opt : Format.formatter -> t option -> unit + val encoding : t Data_encoding.t + val is_local : t -> bool + val is_global : t -> bool + val to_sockaddr : t -> Unix.sockaddr + module Map : Map.S with type key = t + module Set : Set.S with type elt = t + module Table : Hashtbl.S with type key = t +end + +(** Point representing a reachable socket address *) + +module Id_point : sig + type t = addr * port option + val compare : t -> t -> int + val equal : t -> t -> bool + val pp : Format.formatter -> t -> unit + val pp_opt : Format.formatter -> t option -> unit + val encoding : t Data_encoding.t + val is_local : t -> bool + val is_global : t -> bool + module Map : Map.S with type key = t + module Set : Set.S with type elt = t + module Table : Hashtbl.S with type key = t +end + + +(** Identity *) + +module Identity : sig + type t = { + gid : Gid.t ; + public_key : Crypto_box.public_key ; + secret_key : Crypto_box.secret_key ; + proof_of_work_stamp : Crypto_box.nonce ; + } + (** Type of an identity, comprising a gid, a crypto keypair, and a + proof of work stamp with enough difficulty so that the network + accept this identity as genuine. *) + + val encoding : t Data_encoding.t + + val generate : Crypto_box.target -> t + (** [generate target] is a freshly minted identity whose proof of + work stamp difficulty is at least equal to [target]. *) +end + (** Bandwidth usage statistics *) @@ -24,3 +111,19 @@ module Stat : sig val pp: Format.formatter -> t -> unit end + +(** Information about a connection *) + +module Connection_info : sig + + type t = { + incoming : bool; + gid : Gid.t; + id_point : Id_point.t; + remote_socket_port : port; + versions : Version.t list ; + } + + val pp: Format.formatter -> t -> unit + +end diff --git a/test/Makefile b/test/Makefile index 36e6a356d..dd5f5c8fc 100644 --- a/test/Makefile +++ b/test/Makefile @@ -3,7 +3,8 @@ TESTS := \ data-encoding \ store context state \ basic basic.sh \ - p2p-io-scheduler + p2p-io-scheduler \ + p2p-connection \ all: test @@ -182,7 +183,7 @@ clean:: ############################################################################ ## p2p test program -.PHONY:build-test-io_schduler +.PHONY:build-test-p2p-io-scheduler run-test-p2p-io-scheduler build-test-p2p-io-scheduler: test-p2p-io-scheduler run-test-p2p-io-scheduler: ./test-p2p-io-scheduler \ @@ -190,16 +191,30 @@ run-test-p2p-io-scheduler: --max-upload-speed $$((1 << 18)) \ --max-download-speed $$((1 << 20)) +.PHONY:build-test-p2p-connection run-test-p2p-connection +build-test-p2p-connection: test-p2p-connection +run-test-p2p-connection: + ./test-p2p-connection + TEST_P2P_IO_SCHEDULER_IMPLS = \ lib/process.ml \ test_p2p_io_scheduler.ml +TEST_P2P_CONNECTION_IMPLS = \ + lib/process.ml \ + test_p2p_connection.ml + ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}: ${NODELIB} test-p2p-io-scheduler: ${NODELIB} ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx} ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ +${TEST_P2P_CONNECTION_IMPLS:.ml=.cmx}: ${NODELIB} +test-p2p-connection: ${NODELIB} ${TEST_P2P_CONNECTION_IMPLS:.ml=.cmx} + ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ + clean:: - rm -f test-p2p-io_scheduler + -rm -f test-p2p-io_scheduler + -rm -f test-p2p-connection ############################################################################ ## lwt pipe test program diff --git a/test/test_p2p_connection.ml b/test/test_p2p_connection.ml new file mode 100644 index 000000000..e0d84cbc8 --- /dev/null +++ b/test/test_p2p_connection.ml @@ -0,0 +1,204 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* TODO Use Kaputt on the client side and remove `assert` from the + server. *) + +open Error_monad +open P2p_types +include Logging.Make (struct let name = "test-p2p-connection" end) + +let proof_of_work_target = + Crypto_box.make_target [Int64.shift_left 1L 48] +let id1 = Identity.generate proof_of_work_target +let id2 = Identity.generate proof_of_work_target + +let id0 = + (* Luckilly, this will be an insuficient proof of work! *) + Identity.generate (Crypto_box.make_target []) + +let versions = Version.[{ name = "TEST" ; minor = 0 ; major = 0 }] + +let rec listen ?port addr = + let tentative_port = + match port with + | None -> 1024 + Random.int 8192 + | Some port -> port in + let uaddr = Ipaddr_unix.V6.to_inet_addr addr in + let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in + Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ; + Lwt.catch begin fun () -> + Lwt_unix.Versioned.bind_2 main_socket + (ADDR_INET (uaddr, tentative_port)) >>= fun () -> + Lwt_unix.listen main_socket 1 ; + Lwt.return (main_socket, tentative_port) + end begin function + | Unix.Unix_error + ((Unix.EADDRINUSE | Unix.EADDRNOTAVAIL), _, _) when port = None -> + listen addr + | exn -> Lwt.fail exn + end + +let raw_accept sched main_socket = + Lwt_unix.accept main_socket >>= fun (fd, sockaddr) -> + let fd = P2p_io_scheduler.register sched fd in + let point = + match sockaddr with + | Lwt_unix.ADDR_UNIX _ -> assert false + | Lwt_unix.ADDR_INET (addr, port) -> + Ipaddr_unix.V6.of_inet_addr_exn addr, port in + Lwt.return (fd, point) + +let accept sched main_socket = + raw_accept sched main_socket >>= fun (fd, point) -> + P2p_connection.authenticate + ~proof_of_work_target + ~incoming:true fd point id1 versions + +let raw_connect sched addr port = + let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let uaddr = + Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in + Lwt_unix.connect fd uaddr >>= fun () -> + let fd = P2p_io_scheduler.register sched fd in + Lwt.return fd + +let connect sched addr port id = + raw_connect sched addr port >>= fun fd -> + P2p_connection.authenticate + ~proof_of_work_target + ~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) -> + assert (not info.incoming) ; + assert (Gid.compare info.gid id1.gid = 0) ; + return auth_fd + +let simple_msg = + MBytes.create (1 lsl 1) + +let is_rejected = function + | Error [P2p_connection.Rejected] -> true + | Ok _ | Error _ -> false + +let is_connection_closed = function + | Error [P2p_io_scheduler.Connection_closed] -> true + | Ok _ | Error _ -> false + +let bytes_encoding = Data_encoding.Variable.bytes + +let server main_socket = + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + (* Low-level test. *) + raw_accept sched main_socket >>= fun (fd, point) -> + lwt_log_notice "Low_level" >>= fun () -> + P2p_io_scheduler.write fd simple_msg >>=? fun () -> + P2p_io_scheduler.close fd >>=? fun _ -> + lwt_log_notice "Low_level OK" >>= fun () -> + (* Kick the first connection. *) + accept sched main_socket >>=? fun (info, auth_fd) -> + lwt_log_notice "Kick" >>= fun () -> + assert (info.incoming) ; + assert (Gid.compare info.gid id2.gid = 0) ; + P2p_connection.kick auth_fd >>= fun () -> + lwt_log_notice "Kick OK" >>= fun () -> + (* Let's be rejected. *) + accept sched main_socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd bytes_encoding >>= fun conn -> + assert (is_rejected conn) ; + lwt_log_notice "Kicked OK" >>= fun () -> + (* Accept and send a single message. *) + accept sched main_socket >>=? fun (info, auth_fd) -> + lwt_log_notice "Single" >>= fun () -> + P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + lwt_log_notice "Single OK" >>= fun () -> + (* Accept and send a single message, while the client expected 2. *) + accept sched main_socket >>=? fun (info, auth_fd) -> + lwt_log_notice "Early close (read)" >>= fun () -> + P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + lwt_log_notice "Early close (read) OK" >>= fun () -> + (* Accept and wait for the client to close the connection. *) + accept sched main_socket >>=? fun (info, auth_fd) -> + lwt_log_notice "Early close (write)" >>= fun () -> + P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> + P2p_connection.close conn >>= fun _stat -> + lwt_log_notice "Early close (write) OK" >>= fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + Lwt_unix.sleep 0.2 >>= fun () -> + lwt_log_notice "Success" >>= fun () -> + return () + +let client addr port = + let msg = MBytes.create (MBytes.length simple_msg) in + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + raw_connect sched addr port >>= fun fd -> + P2p_io_scheduler.read_full fd msg >>=? fun () -> + assert (MBytes.compare simple_msg msg = 0) ; + P2p_io_scheduler.close fd >>=? fun () -> + lwt_log_notice "Low_level OK" >>= fun () -> + (* let's be rejected. *) + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd bytes_encoding >>= fun conn -> + assert (is_rejected conn) ; + lwt_log_notice "Kick OK" >>= fun () -> + (* let's reject! *) + lwt_log_notice "Kicked" >>= fun () -> + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.kick auth_fd >>= fun () -> + (* let's exchange a simple message. *) + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> + P2p_connection.read conn >>=? fun msg -> + assert (MBytes.compare simple_msg msg = 0) ; + P2p_connection.close conn >>= fun _stat -> + lwt_log_notice "Simple OK" >>= fun () -> + (* let's detect a closed connection on `read`. *) + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> + P2p_connection.read conn >>=? fun msg -> + assert (MBytes.compare simple_msg msg = 0) ; + P2p_connection.read conn >>= fun msg -> + assert (is_connection_closed msg) ; + P2p_connection.close conn >>= fun _stat -> + lwt_log_notice "Early close (read) OK" >>= fun () -> + (* let's detect a closed connection on `write`. *) + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> + Lwt_unix.sleep 0.1 >>= fun () -> + P2p_connection.write_sync conn simple_msg >>= fun unit -> + assert (is_connection_closed unit) ; + P2p_connection.close conn >>= fun _stat -> + lwt_log_notice "Early close (write) OK" >>= fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + lwt_log_notice "Success" >>= fun () -> + return () + +let default_addr = Ipaddr.V6.localhost + +let main () = + listen default_addr >>= fun (main_socket, port) -> + let server = + Process.detach ~prefix:"server " begin fun () -> + Process.handle_error begin fun () -> + server main_socket + end + end in + let client = + Process.detach ~prefix:"client " begin fun () -> + Lwt_utils.safe_close main_socket >>= fun () -> + Process.handle_error begin fun () -> + client default_addr port + end + end in + Process.wait [ server ; client ] + +let () = + Lwt_main.run (main ())