From e1692ed9bf69846b538fb9291cd368cc5fc3378f Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Sat, 14 Jan 2017 13:14:17 +0100 Subject: [PATCH] Shell: use the new P2P backend --- src/Makefile | 2 +- src/attacker/attacker_minimal.ml | 211 ++-- src/node/net/p2p.ml | 1646 +++++------------------------- src/node/net/p2p.mli | 189 ++-- src/node/shell/tezos_p2p.ml | 52 +- src/node/shell/tezos_p2p.mli | 60 +- src/node_main.ml | 121 ++- 7 files changed, 650 insertions(+), 1631 deletions(-) diff --git a/src/Makefile b/src/Makefile index 64584fdbe..2617fc8af 100644 --- a/src/Makefile +++ b/src/Makefile @@ -336,13 +336,13 @@ NODE_IMPLS := \ NODE_PACKAGES := \ $(COMPILER_PACKAGES) \ calendar \ + cmdliner \ cohttp.lwt \ dynlink \ git \ ipv6-multicast \ irmin.unix \ ocplib-resto.directory \ - cmdliner \ EMBEDDED_NODE_PROTOCOLS := \ diff --git a/src/attacker/attacker_minimal.ml b/src/attacker/attacker_minimal.ml index d16dd60e7..abafdf4e1 100644 --- a/src/attacker/attacker_minimal.ml +++ b/src/attacker/attacker_minimal.ml @@ -8,8 +8,7 @@ (**************************************************************************) open Format -open Lwt -open Tezos_p2p +include Logging.Make(struct let name = "attacker" end) module Proto = Client_embedded_proto_bootstrap module Ed25519 = Proto.Local_environment.Environment.Ed25519 @@ -104,141 +103,170 @@ let ballot_forged period prop vote = operations = [ballot] }) in forge { net_id = network } op +let identity = P2p_types.Identity.generate Crypto_box.default_target + (* connect to the network, run an action and then disconnect *) let try_action addr port action = - let limits : P2p.limits = { - max_message_size = 1 lsl 16 ; - peer_answer_timeout = 10. ; - expected_connections = 1; - min_connections = 1 ; - max_connections = 1 ; - blacklist_time = 0. ; - } in - let config : P2p.config = { - incoming_port = None ; - discovery_port = None ; - known_peers = [(addr, port)] ; - peers_file = Filename.temp_file "peers_file" ".txt"; - closed_network = true ; - } in - bootstrap ~config ~limits >>= fun net -> - let peer = - match peers net with - | [peer] -> peer - | _ -> Pervasives.failwith "" in - action net peer >>= fun () -> shutdown net + let socket = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let uaddr = Ipaddr_unix.V6.to_inet_addr addr in + Lwt_unix.connect socket (Lwt_unix.ADDR_INET (uaddr, port)) >>= fun () -> + let io_sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 14) () in + let conn = P2p_io_scheduler.register io_sched socket in + P2p_connection.authenticate + ~proof_of_work_target:Crypto_box.default_target + ~incoming:false + conn + (addr, port) + identity Tezos_p2p.Raw.supported_versions >>=? fun (_, auth_fd) -> + P2p_connection.accept auth_fd Tezos_p2p.Raw.encoding >>= function + | Error _ -> failwith "Connection rejected by peer." + | Ok conn -> + action conn >>=? fun () -> + P2p_connection.close conn >>= fun () -> + return () let replicate n x = let rec replicate_acc acc n x = if n <= 0 then acc else replicate_acc (x :: acc) (n-1) x in replicate_acc [] n x -let request_block_times block_hash n net peer = - let open Block_hash in - let () = printf "requesting %a block %a times\n" - pp_short block_hash pp_print_int n in - let block_hashes = replicate n block_hash in - send net peer (Get_blocks block_hashes) +let send conn (msg : Tezos_p2p.msg) = + P2p_connection.write conn (Tezos_p2p.Raw.Message msg) -let request_op_times op_signed n net peer = +let request_block_times block_hash n conn = + let open Block_hash in + lwt_log_notice + "requesting %a block %d times" + pp_short block_hash n >>= fun () -> + let block_hashes = replicate n block_hash in + send conn (Get_blocks block_hashes) + +let request_op_times op_signed n conn = let open Operation_hash in let op_hash = hash_bytes [op_signed] in - let () = printf "sending %a transaction\n" pp_short op_hash in - send net peer (Operation op_signed) >>= fun () -> - let () = printf "requesting %a transaction %a times\n" - pp_short op_hash pp_print_int n in + lwt_log_notice "sending %a transaction" pp_short op_hash >>= fun () -> + send conn (Operation op_signed) >>=? fun () -> + lwt_log_notice + "requesting %a transaction %d times" + pp_short op_hash n >>= fun () -> let op_hashes = replicate n op_hash in - send net peer (Get_operations op_hashes) + send conn (Get_operations op_hashes) -let send_block_size n net peer = +let send_block_size n conn = let bytes = MBytes.create n in let open Block_hash in - let () = printf "propagating fake %a byte block %a\n" - pp_print_int n pp_short (hash_bytes [bytes]) in - send net peer (Block bytes) + lwt_log_notice + "propagating fake %d byte block %a" n pp_short (hash_bytes [bytes]) >>= fun () -> + send conn (Block bytes) -let send_protocol_size n net peer = +let send_protocol_size n conn = let bytes = MBytes.create n in let open Protocol_hash in - let () = printf "propagating fake %a byte protocol %a\n" - pp_print_int n pp_short (hash_bytes [bytes]) in - send net peer (Protocol bytes) + lwt_log_notice + "propagating fake %d byte protocol %a" + n pp_short (hash_bytes [bytes]) >>= fun () -> + send conn (Protocol bytes) -let send_operation_size n net peer = +let send_operation_size n conn = let op_faked = MBytes.create n in let op_hashed = Operation_hash.hash_bytes [op_faked] in - let () = printf "propagating fake %a byte operation %a\n" - pp_print_int n Operation_hash.pp_short op_hashed in - send net peer (Operation op_faked) >>= fun () -> + lwt_log_notice + "propagating fake %d byte operation %a" + n Operation_hash.pp_short op_hashed >>= fun () -> + send conn (Operation op_faked) >>=? fun () -> let block = signed (block_forged [op_hashed]) in let block_hashed = Block_hash.hash_bytes [block] in - let () = printf "propagating block %a with operation\n" - Block_hash.pp_short block_hashed in - send net peer (Block block) + lwt_log_notice + "propagating block %a with operation" + Block_hash.pp_short block_hashed >>= fun () -> + send conn (Block block) -let send_operation_bad_signature () net peer = +let send_operation_bad_signature () conn = let open Operation_hash in let signed_wrong_op = signed_wrong (tx_forged 5L 1L) in let hashed_wrong_op = hash_bytes [signed_wrong_op] in - let () = printf "propagating operation %a with wrong signature\n" - pp_short hashed_wrong_op in - send net peer (Operation signed_wrong_op) >>= fun () -> + lwt_log_notice + "propagating operation %a with wrong signature" + pp_short hashed_wrong_op >>= fun () -> + send conn (Operation signed_wrong_op) >>=? fun () -> let block = signed (block_forged [hashed_wrong_op]) in let block_hashed = Block_hash.hash_bytes [block] in - let () = printf "propagating block %a with operation\n" - Block_hash.pp_short block_hashed in - send net peer (Block block) + lwt_log_notice + "propagating block %a with operation" + Block_hash.pp_short block_hashed >>= fun () -> + send conn (Block block) -let send_block_bad_signature () net peer = +let send_block_bad_signature () conn = let open Block_hash in let signed_wrong_block = signed_wrong (block_forged []) in - let () = printf "propagating block %a with wrong signature\n" - pp_short (hash_bytes [signed_wrong_block]) in - send net peer (Block signed_wrong_block) + lwt_log_notice + "propagating block %a with wrong signature" + pp_short (hash_bytes [signed_wrong_block]) >>= fun () -> + send conn (Block signed_wrong_block) -let double_spend () net peer = +let double_spend () conn = let spend account = let op_signed = signed (tx_forged ~dest:account 199999999L 1L) in let op_hashed = Operation_hash.hash_bytes [op_signed] in let block_signed = signed (block_forged [op_hashed]) in let block_hashed = Block_hash.hash_bytes [block_signed] in - let () = printf "propagating operation %a\n" - Operation_hash.pp_short op_hashed in - send net peer (Operation op_signed) >>= fun () -> - let () = printf "propagating block %a\n" - Block_hash.pp_short block_hashed in - send net peer (Block block_signed) in - spend destination_account <&> spend another_account + lwt_log_notice + "propagating operation %a" + Operation_hash.pp_short op_hashed >>= fun () -> + send conn (Operation op_signed) >>=? fun () -> + lwt_log_notice + "propagating block %a" + Block_hash.pp_short block_hashed >>= fun () -> + send conn (Block block_signed) in + spend destination_account >>=? fun () -> + spend another_account -let long_chain n net peer = - let () = printf "propogating %a blocks\n" - pp_print_int n in +let long_chain n conn = + lwt_log_notice "propogating %d blocks" n >>= fun () -> let prev_ref = ref genesis_block_hashed in - let rec loop k = if k < 1 then return_unit else + let rec loop k = + if k < 1 then + return () + else let block = signed (block_forged ~prev:!prev_ref []) in - let () = prev_ref := Block_hash.hash_bytes [block] in - send net peer (Block block) >>= fun () -> loop (k-1) in + prev_ref := Block_hash.hash_bytes [block] ; + send conn (Block block) >>=? fun () -> + loop (k-1) in loop n -let lots_transactions amount fee n net peer = +let lots_transactions amount fee n conn = let signed_op = signed (tx_forged amount fee) in - let rec loop k = if k < 1 then return_unit else - send net peer (Operation signed_op) >>= fun () -> loop (k-1) in + let rec loop k = + if k < 1 then + return () + else + send conn (Operation signed_op) >>=? fun () -> + loop (k-1) in let ops = replicate n (Operation_hash.hash_bytes [signed_op]) in let signed_block = signed (block_forged ops) in - let () = printf "propogating %a transactions\n" - pp_print_int n in - loop n >>= fun () -> - let () = printf "propagating block %a with wrong signature\n" - Block_hash.pp_short (Block_hash.hash_bytes [signed_block]) in - send net peer (Block signed_block) + lwt_log_notice "propogating %d transactions" n >>= fun () -> + loop n >>=? fun () -> + lwt_log_notice + "propagating block %a with wrong signature" + Block_hash.pp_short (Block_hash.hash_bytes [signed_block]) >>= fun () -> + send conn (Block signed_block) let main () = - let addr = Ipaddr.V4 Ipaddr.V4.localhost in + let addr = Ipaddr.V6.localhost in let port = 9732 in let run_action action = try_action addr port action in - let run_cmd_unit lwt = Arg.Unit (fun () -> Lwt_main.run (lwt ())) in - let run_cmd_int_suffix lwt = Arg.String (fun str -> + let run_cmd_unit lwt = + Arg.Unit begin fun () -> + Lwt_main.run begin + lwt () >>= function + | Ok () -> Lwt.return_unit + | Error err -> + lwt_log_error "Error: %a" pp_print_error err >>= fun () -> + Lwt.return_unit + end + end in + let run_cmd_int_suffix lwt = + Arg.String begin fun str -> let last = str.[String.length str - 1] in let init = String.sub str 0 (String.length str - 1) in let n = @@ -249,7 +277,14 @@ let main () = else if last == 'g' || last == 'G' then int_of_string init * 1 lsl 30 else int_of_string str in - Lwt_main.run (lwt n)) in + Lwt_main.run begin + lwt n >>= function + | Ok () -> Lwt.return_unit + | Error err -> + lwt_log_error "Error: %a" pp_print_error err >>= fun () -> + Lwt.return_unit + end + end in let cmds = [( "-1", run_cmd_int_suffix (run_action << request_block_times genesis_block_hashed), diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 3d21775d5..09386c6ff 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -7,1418 +7,324 @@ (* *) (**************************************************************************) -module LU = Lwt_unix -module LC = Lwt_condition +include P2p_types -open Lwt.Infix -open Logging.Net - -type error += Encoding_error -type error += Message_too_big -type error += Write_would_block -type error += Decipher_error -type error += Canceled -type error += Timeout - -(* public types *) -type addr = Ipaddr.t -type port = int -type version = { - name : string ; - major : int ; - minor : int ; -} - -let version_encoding = - let open Data_encoding in - conv - (fun { name; major; minor } -> (name, major, minor)) - (fun (name, major, minor) -> { name; major; minor }) - (obj3 - (req "name" string) - (req "major" int8) - (req "minor" int8)) - -type limits = { - max_message_size : int ; - peer_answer_timeout : float ; - expected_connections : int ; - min_connections : int ; - max_connections : int ; - blacklist_time : float ; -} type config = { - incoming_port : port option ; - discovery_port : port option ; - known_peers : (addr * port) list ; + listening_port : port option ; + listening_addr : addr option ; + trusted_points : Point.t list ; peers_file : string ; closed_network : bool ; + identity : Identity.t ; + proof_of_work_target : Crypto_box.target ; } -(* The global net identificator. *) -type gid = string +type limits = { -let gid_length = 16 + authentification_timeout : float ; -let pp_gid ppf gid = - Format.pp_print_string ppf (Hex_encode.hex_encode gid) + min_connections : int ; + expected_connections : int ; + max_connections : int ; -let zero_gid = String.make 16 '\x00' + backlog : int ; + max_incoming_connections : int ; -(* the common version for a pair of peers, if any, is the maximum one, - in lexicographic order *) -let common_version la lb = - let la = List.sort (fun l r -> compare r l) la in - let lb = List.sort (fun l r -> compare r l) lb in - let rec find = function - | [], _ | _, [] -> None - | ((a :: ta) as la), ((b :: tb) as lb) -> - if a = b then Some a - else if a < b then find (ta, lb) - else find (la, tb) - in find (la, lb) + max_download_speed : int option ; + max_upload_speed : int option ; -(* A net point (address x port). *) -type point = addr * port + read_buffer_size : int ; + read_queue_size : int option ; + write_queue_size : int option ; + incoming_app_message_queue_size : int option ; + incoming_message_queue_size : int option ; + outgoing_message_queue_size : int option ; -let point_encoding = - let open Data_encoding in - let open Ipaddr in - conv - (fun (addr, port) -> - (match addr with - | V4 v4 -> V4.to_bytes v4 - | V6 v6 -> V6.to_bytes v6), port) - (fun (addr, port) -> - (match String.length addr with - | 4 -> V4 (V4.of_bytes_exn addr) - | 16 -> V6 (V6.of_bytes_exn addr) - | _ -> Pervasives.failwith "point_encoding"), port) - (obj2 - (req "addr" string) - (req "port" int16)) +} -type 'msg encoding = Encoding : { - tag: int ; - encoding: 'a Data_encoding.t ; - wrap: 'a -> 'msg ; - unwrap: 'msg -> 'a option ; - max_length: int option ; - } -> 'msg encoding +let create_scheduler limits = + P2p_io_scheduler.create + ~read_buffer_size:limits.read_buffer_size + ?max_upload_speed:limits.max_upload_speed + ?max_download_speed:limits.max_download_speed + ?read_queue_size:limits.read_queue_size + ?write_queue_size:limits.write_queue_size + () -module type PARAMS = sig +let create_connection_pool config limits meta_cfg msg_cfg io_sched = + let pool_cfg = { + P2p_connection_pool.identity = config.identity ; + proof_of_work_target = config.proof_of_work_target ; + listening_port = config.listening_port ; + trusted_points = config.trusted_points ; + peers_file = config.peers_file ; + closed_network = config.closed_network ; + min_connections = limits.min_connections ; + max_connections = limits.max_connections ; + max_incoming_connections = limits.max_incoming_connections ; + authentification_timeout = limits.authentification_timeout ; + incoming_app_message_queue_size = limits.incoming_app_message_queue_size ; + incoming_message_queue_size = limits.incoming_message_queue_size ; + outgoing_message_queue_size = limits.outgoing_message_queue_size ; + } + in + let pool = + P2p_connection_pool.create pool_cfg meta_cfg msg_cfg io_sched in + pool - (** Type of message used by higher layers *) - type msg +let bounds ~min ~expected ~max = + assert (min <= expected) ; + assert (expected <= max) ; + let step_min = + (expected - min) / 3 + and step_max = + (max - expected) / 3 in + { P2p_maintenance.min_threshold = min + step_min ; + min_target = min + 2 * step_min ; + max_target = max - 2 * step_max ; + max_threshold = max - step_max ; + } - val encodings : msg encoding list +let may_create_discovery_worker _config pool = + Some (P2p_discovery.create pool) - (** Type of metadata associated to an identity *) - type metadata +let create_maintenance_worker limits pool disco = + let bounds = + bounds + limits.min_connections + limits.expected_connections + limits.max_connections + in + P2p_maintenance.run + ~connection_timeout:limits.authentification_timeout bounds pool disco - val initial_metadata : metadata - val metadata_encoding : metadata Data_encoding.t - val score : metadata -> float - - (** High level protocol(s) talked by the peer. When two peers - initiate a connection, they exchange their list of supported - versions. The chosen one, if any, is the maximum common one (in - lexicographic order) *) - val supported_versions : version list +let may_create_welcome_worker config limits pool = + match config.listening_port with + | None -> Lwt.return None + | Some port -> + P2p_welcome.run + ~backlog:limits.backlog pool + ?addr:config.listening_addr port >>= fun w -> + Lwt.return (Some w) +module type MESSAGE = sig + type t + val encoding : t P2p_connection_pool.encoding list + val supported_versions : Version.t list end -module Make (P: PARAMS) = struct +module type METADATA = sig + type t + val initial : t + val encoding : t Data_encoding.t + val score : t -> float +end - (* Low-level network protocol messages (internal). The protocol is - completely symmetrical and asynchronous. First both peers must - present their credentials with a [Connect] message, then any - combination of the other messages can be received at any time. An - exception is the [Disconnect] message, which should mark the end of - transmission (and needs not being replied). *) - type msg = - | Connect of { - gid : string ; - port : int option ; - versions : version list ; - public_key : Crypto_box.public_key ; - proof_of_work : Crypto_box.nonce ; - message_nonce : Crypto_box.nonce ; +module Make (Message : MESSAGE) (Metadata : METADATA) = struct + + let meta_cfg = { + P2p_connection_pool.encoding = Metadata.encoding ; + initial = Metadata.initial ; + } + and msg_cfg = { + P2p_connection_pool.encoding = Message.encoding ; + versions = Message.supported_versions ; + } + + type connection = (Message.t, Metadata.t) P2p_connection_pool.connection + + module Real = struct + + type net = { + config: config ; + limits: limits ; + io_sched: P2p_io_scheduler.t ; + pool: (Message.t, Metadata.t) P2p_connection_pool.t ; + discoverer: P2p_discovery.t option ; + maintenance: Metadata.t P2p_maintenance.t ; + welcome: P2p_welcome.t option ; + } + + let create ~config ~limits = + let io_sched = create_scheduler limits in + create_connection_pool + config limits meta_cfg msg_cfg io_sched >>= fun pool -> + let discoverer = may_create_discovery_worker config pool in + let maintenance = create_maintenance_worker limits pool discoverer in + may_create_welcome_worker config limits pool >>= fun welcome -> + Lwt.return { + config ; + limits ; + io_sched ; + pool ; + discoverer ; + maintenance ; + welcome ; } - | Disconnect - | Bootstrap - | Advertise of point list - | Message of P.msg - let msg_encoding = - let open Data_encoding in - union ~tag_size:`Uint16 - ([ case ~tag:0x00 - (obj6 - (req "gid" (Fixed.string gid_length)) - (req "port" uint16) - (req "pubkey" Crypto_box.public_key_encoding) - (req "proof_of_work" Crypto_box.nonce_encoding) - (req "message_nonce" Crypto_box.nonce_encoding) - (req "versions" (Variable.list version_encoding))) - (function - | Connect { gid ; port ; public_key ; - proof_of_work ; message_nonce ; versions } -> - let port = match port with None -> 0 | Some port -> port in - Some (gid, port, public_key, - proof_of_work, message_nonce, versions) - | _ -> None) - (fun (gid, port, public_key, - proof_of_work, message_nonce, versions) -> - let port = if port = 0 then None else Some port in - Connect { gid ; port ; versions ; - public_key ; proof_of_work ; message_nonce }); - case ~tag:0x01 null - (function Disconnect -> Some () | _ -> None) - (fun () -> Disconnect); - case ~tag:0x02 null - (function Bootstrap -> Some () | _ -> None) - (fun () -> Bootstrap); - case ~tag:0x03 (Variable.list point_encoding) - (function Advertise points -> Some points | _ -> None) - (fun points -> Advertise points); - ] @ - ListLabels.map P.encodings - ~f:(function Encoding { tag ; encoding ; wrap ; unwrap } -> - case ~tag encoding - (function Message msg -> unwrap msg | _ -> None) - (fun msg -> Message (wrap msg)))) + let gid { config } = config.identity.gid - let hdrlen = 2 - let maxlen = hdrlen + 2 lsl 16 + let maintain { maintenance } () = + P2p_maintenance.maintain maintenance - (* read a message from a TCP socket *) - let recv_msg ?(uncrypt = (fun buf -> Some buf)) fd buf = - Lwt.catch begin fun () -> - assert (MBytes.length buf >= 2 lsl 16) ; - Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () -> - let len = EndianBigstring.BigEndian.get_uint16 buf 0 in - (* TODO timeout read ??? *) - Lwt_utils.read_mbytes ~len fd buf >>= fun () -> - let buf = MBytes.sub buf 0 len in - match uncrypt buf with - | None -> - (* TODO track invalid message *) - Error_monad.fail Decipher_error - | Some buf -> - match Data_encoding.Binary.of_bytes msg_encoding buf with - | None -> - (* TODO track invalid message *) - Error_monad.fail Encoding_error - | Some msg -> - Error_monad.return (len, msg) - end - (fun exn -> Lwt.return @@ Error_monad.error_exn exn) + let roll _net () = Lwt.return_unit (* TODO implement *) - (* send a message over a TCP socket *) - let send_msg ?crypt fd buf msg = - Lwt.catch begin fun () -> - match crypt, Data_encoding.Binary.write msg_encoding msg buf hdrlen with - | _, None -> Error_monad.fail Encoding_error - | None, Some len -> - if len > maxlen then Error_monad.fail Message_too_big - else begin - EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; - (* TODO timeout write ??? *) - Lwt_utils.write_mbytes ~len fd buf >>= fun () -> - Error_monad.return len - end - | Some crypt, Some len -> - let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in - let len = MBytes.length encbuf in - if len > maxlen then Error_monad.fail Message_too_big - else begin - let lenbuf = MBytes.create 2 in - EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; - Lwt_utils.write_mbytes fd lenbuf >>= fun () -> - Lwt_utils.write_mbytes fd encbuf >>= fun () -> - Error_monad.return len - end - end - (fun exn -> Lwt.return @@ Error_monad.error_exn exn) + (* returns when all workers have shutted down in the opposite + creation order. *) + let shutdown net () = + Lwt_utils.may ~f:P2p_welcome.shutdown net.welcome >>= fun () -> + P2p_maintenance.shutdown net.maintenance >>= fun () -> + Lwt_utils.may ~f:P2p_discovery.shutdown net.discoverer >>= fun () -> + P2p_connection_pool.destroy net.pool >>= fun () -> + P2p_io_scheduler.shutdown net.io_sched - (* The (internal) type of network events, those dispatched from peer - workers to the net and others internal to net workers. *) - type event = - | Disconnected of peer - | Bootstrap of peer - | Recv of peer * P.msg - | Peers of point list - | Contact of point * LU.file_descr - | Connected of peer - | Shutdown + let connections { pool } () = + P2p_connection_pool.fold_connections pool + ~init:[] ~f:(fun _gid c acc -> c :: acc) + let find_connection { pool } gid = + P2p_connection_pool.Gids.find_connection pool gid + let connection_info _net conn = + P2p_connection_pool.connection_info conn + let connection_stat _net conn = + P2p_connection_pool.connection_stat conn + let global_stat { pool } () = + P2p_connection_pool.pool_stat pool + let set_metadata { pool } conn meta = + P2p_connection_pool.Gids.set_metadata pool conn meta + let get_metadata { pool } conn = + P2p_connection_pool.Gids.get_metadata pool conn - (* A peer handle, as a record-encoded object, abstract from the - outside world. A hidden Lwt worker is associated to a peer at its - creation and is killed using the disconnect callback by net - workers (on shutdown of during maintenance). *) - and peer = { - gid : gid ; - public_key : Crypto_box.public_key ; - point : point ; - listening_port : port option ; - version : version ; - last_seen : unit -> float ; - disconnect : unit -> unit Lwt.t; - send : msg -> unit Lwt.t ; - try_send : msg -> bool ; - reader : event Lwt_pipe.t ; - writer : msg Lwt_pipe.t ; - total_sent : unit -> int ; - total_recv : unit -> int ; - current_inflow : unit -> float ; - current_outflow : unit -> float ; - } + let rec recv net () = + let pipes = + P2p_connection_pool.fold_connections + net.pool ~init:[] ~f:begin fun _gid conn acc -> + (P2p_connection_pool.is_readable conn >>= function + | Ok () -> Lwt.return conn + | Error _ -> Lwt_utils.never_ending) :: acc + end in + Lwt.pick pipes >>= fun conn -> + P2p_connection_pool.read conn >>= function + | Ok msg -> + Lwt.return (conn, msg) + | Error _ -> + Lwt_unix.yield () >>= fun () -> + recv net () - type peer_info = { - gid : gid ; - addr : addr ; - port : port ; - version : version ; - total_sent : int ; - total_recv : int ; - current_inflow : float ; - current_outflow : float ; - } + let send _net c m = + P2p_connection_pool.write c m >>= function + | Ok () -> Lwt.return_unit + | Error _ -> Lwt.fail End_of_file (* temporary *) + + let try_send _net c v = + match P2p_connection_pool.write_now c v with + | Ok v -> v + | Error _ -> false + + let broadcast { pool } msg = P2p_connection_pool.write_all pool msg + + end + + module Fake = struct + + let id = Identity.generate Crypto_box.default_target + let empty_stat = { + Stat.total_sent = 0 ; + total_recv = 0 ; + current_inflow = 0 ; + current_outflow = 0 ; + } + let connection_info = { + Connection_info.incoming = false ; + gid = id.gid ; + id_point = (Ipaddr.V6.unspecified, None) ; + remote_socket_port = 0 ; + versions = [] ; + } + + end - (* A net handler, as a record-encoded object, abstract from the - outside world. Hidden Lwt workers are associated to a net at its - creation and can be killed using the shutdown callback. *) type net = { - gid : gid ; - recv_from : unit -> (peer * P.msg) Lwt.t ; - send_to : peer -> P.msg -> unit Lwt.t ; - try_send_to : peer -> P.msg -> bool ; - broadcast : P.msg -> unit ; - blacklist : ?duration:float -> addr -> unit ; - whitelist : peer -> unit ; + gid : Gid.t ; maintain : unit -> unit Lwt.t ; roll : unit -> unit Lwt.t ; shutdown : unit -> unit Lwt.t ; - peers : unit -> peer list ; - find_peer : gid -> peer option ; - peer_info : peer -> peer_info ; - set_metadata : gid -> P.metadata -> unit ; - get_metadata : gid -> P.metadata option ; + connections : unit -> connection list ; + find_connection : Gid.t -> connection option ; + connection_info : connection -> Connection_info.t ; + connection_stat : connection -> Stat.t ; + global_stat : unit -> Stat.t ; + get_metadata : Gid.t -> Metadata.t option ; + set_metadata : Gid.t -> Metadata.t -> unit ; + recv : unit -> (connection * Message.t) Lwt.t ; + send : connection -> Message.t -> unit Lwt.t ; + try_send : connection -> Message.t -> bool ; + broadcast : Message.t -> unit ; } - (* Run-time point-or-gid indexed storage, one point is bound to at - most one gid, which is the invariant we want to keep both for the - connected peers table and the known peers one *) - module GidMap = Map.Make (struct type t = gid let compare = compare end) - module GidSet = Set.Make (struct type t = gid let compare = compare end) - module PointMap = Map.Make (struct type t = point let compare = compare end) - module PointSet = Set.Make (struct type t = point let compare = compare end) - module PeerMap : sig - type 'a t - val empty : 'a t - val by_point : point -> 'a t -> 'a - val by_gid : gid -> 'a t -> 'a - val gid_by_point : point -> 'a t -> gid option - val point_by_gid : gid -> 'a t -> point - val mem_by_point : point -> 'a t -> bool - val mem_by_gid : gid -> 'a t -> bool - val remove_by_point : point -> 'a t -> 'a t - val remove_by_gid : gid -> 'a t -> 'a t - val update : point -> ?gid : gid -> 'a -> 'a t -> 'a t - val fold : (point -> gid option -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b - val iter : (point -> gid option -> 'a -> unit) -> 'a t -> unit - val bindings : 'a t -> (point * gid option * 'a) list - val cardinal : 'a t -> int - end = struct - type 'a t = - { by_point : (gid option * 'a) PointMap.t ; - by_gid : (point * 'a) GidMap.t } - - let empty = - { by_point = PointMap.empty ; - by_gid = GidMap.empty } - - let by_point point { by_point } = - let (_, v) = PointMap.find point by_point in v - - let by_gid gid { by_gid } = - let (_, v) = GidMap.find gid by_gid in v - - let gid_by_point point { by_point } = - let (gid, _) = PointMap.find point by_point in gid - - let point_by_gid gid { by_gid } = - let (point, _) = GidMap.find gid by_gid in point - - let mem_by_point point { by_point } = - PointMap.mem point by_point - - let mem_by_gid gid { by_gid } = - GidMap.mem gid by_gid - - let remove_by_point point ({ by_point ; by_gid } as map) = - try - let (gid, _) = PointMap.find point by_point in - { by_point = PointMap.remove point by_point ; - by_gid = match gid with - | None -> by_gid - | Some gid -> GidMap.remove gid by_gid } - with Not_found -> map - - let remove_by_gid gid ({ by_point ; by_gid } as map) = - try - let (point, _) = GidMap.find gid by_gid in - { by_point = PointMap.remove point by_point ; - by_gid = GidMap.remove gid by_gid } - with Not_found -> map - - let update point ?gid v map = - let { by_point ; by_gid } = - let map = remove_by_point point map in - match gid with Some gid -> remove_by_gid gid map | None -> map in - { by_point = PointMap.add point (gid, v) by_point ; - by_gid = match gid with Some gid -> GidMap.add gid (point, v) by_gid - | None -> by_gid } - - let fold f { by_point } init = - PointMap.fold - (fun point (gid, v) r -> f point gid v r) by_point init - - let iter f { by_point } = - PointMap.iter - (fun point (gid, v) -> f point gid v) by_point - - let cardinal { by_point } = - PointMap.cardinal by_point - - let bindings map = - fold (fun point gid v l -> (point, gid, v) :: l) map [] - end - - (* Builds a peer and launches its associated worker. Takes a push - function for communicating with the main worker using events - (including the one sent when the connection is alive). Returns a - canceler. *) - let connect_to_peer - config limits my_gid my_public_key my_secret_key my_proof_of_work - socket (addr, port) control_events white_listed = - (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = Lwt_utils.canceler () in - (* a cancelable encrypted reception *) - let recv ?uncrypt buf = - Lwt.pick [ recv_msg ?uncrypt socket buf ; - (cancelation () >>= fun () -> Error_monad.fail Canceled) ] - in - (* First step: send and receive credentials, makes no difference - whether we're trying to connect to a peer or checking an incoming - connection, both parties must first present themselves. *) - let rec connect buf = - let local_nonce = Crypto_box.random_nonce () in - send_msg socket buf - (Connect { gid = my_gid ; - public_key = my_public_key ; - proof_of_work = my_proof_of_work ; - message_nonce = local_nonce ; - port = config.incoming_port ; - versions = P.supported_versions }) >>= fun _ -> - Lwt.pick - [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> Error_monad.fail Timeout ) ; - recv buf ] >>= function - | Error [Timeout] - | Error [Canceled] - | Error [Exn End_of_file] -> - debug "(%a) Closed connection to %a:%d." - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | Error err -> - log_error "(%a) error receiving from %a:%d: %a" - pp_gid my_gid Ipaddr.pp_hum addr port - Error_monad.pp_print_error err ; - cancel () - | Ok (_, (Connect { gid; port = listening_port; versions ; - public_key ; proof_of_work ; message_nonce })) -> - debug "(%a) connection requested from %a @@ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - let work_proved = - Crypto_box.check_proof_of_work - public_key proof_of_work Crypto_box.default_target in - if not work_proved then begin - debug "connection rejected (invalid proof of work)" ; - cancel () - end else begin - match common_version P.supported_versions versions with - | None -> - debug - "(%a) connection rejected (incompatible versions) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | Some version -> - if config.closed_network then - match listening_port with - | Some port when white_listed (addr, port) -> - connected - buf local_nonce version gid - public_key message_nonce listening_port - | Some port -> - debug - "(%a) connection rejected (out of the closed network) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | None -> - debug - "(%a) connection rejected (out of the closed network) from %a:unknown" - pp_gid my_gid Ipaddr.pp_hum addr ; - cancel () - else - connected - buf local_nonce version gid - public_key message_nonce listening_port - end - | Ok (_, Disconnect) -> - debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | _ -> - debug "(%a) connection rejected (bad connection request) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - - (* Them we can build the net object and launch the worker. *) - and connected buf local_nonce version gid public_key nonce listening_port = - (* net object state *) - let last = ref (Unix.gettimeofday ()) in - let local_nonce = ref local_nonce in - let remote_nonce = ref nonce in - let received = ref 0 in - let sent = ref 0 in - (* net object callbaks *) - let last_seen () = !last in - let get_nonce nonce = - let current_nonce = !nonce in - nonce := Crypto_box.increment_nonce !nonce ; - current_nonce in - let disconnect () = cancel () in - let crypt buf = - let nonce = get_nonce remote_nonce in - Crypto_box.box my_secret_key public_key buf nonce in - let writer = Lwt_pipe.create ~size:2 () in - let send p = Lwt_pipe.push writer p in - let try_send p = Lwt_pipe.push_now writer p in - let reader = Lwt_pipe.create ~size:2 () in - let total_sent () = !sent in - let total_recv () = !received in - let current_inflow () = 0. in - let current_outflow () = 0. in - (* net object construction *) - let peer = { gid ; public_key ; point = (addr, port) ; - listening_port ; version ; last_seen ; - disconnect ; send ; try_send ; reader ; writer ; - total_sent ; total_recv ; current_inflow ; current_outflow } in - let uncrypt buf = - let nonce = get_nonce local_nonce in - match Crypto_box.box_open my_secret_key public_key buf nonce with - | None -> - debug "(%a) cannot decrypt message (from peer) %a @ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - None - | Some _ as res -> res in - (* The message reception loop. *) - let rec receiver () = - recv ~uncrypt buf >>= function - | Error err -> - debug "(%a) error receiving: %a" - pp_gid my_gid Error_monad.pp_print_error err ; - cancel () - | Ok (size, msg) -> - received := !received + size; - match msg with - | Connect _ - | Disconnect -> - debug "(%a) disconnected (by peer) %a @@ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - cancel () - | Bootstrap -> Lwt_pipe.push reader (Bootstrap peer) >>= receiver - | Advertise peers -> Lwt_pipe.push reader (Peers peers) >>= receiver - | Message msg -> Lwt_pipe.push reader (Recv (peer, msg)) >>= receiver - in - let rec sender () = - Lwt_pipe.pop peer.writer >>= fun msg -> - send_msg ~crypt socket buf msg >>= function - | Ok size -> - sent := !sent + size; - sender () - | Error err -> - debug "(%a) error sending to %a: %a" - pp_gid my_gid pp_gid gid Error_monad.pp_print_error err ; - cancel () - in - (* Events for the main worker *) - Lwt_pipe.push control_events (Connected peer) >>= fun () -> - on_cancel (fun () -> Lwt_pipe.push control_events (Disconnected peer)) ; - (* Launch the workers *) - Lwt.join [receiver () ; sender ()] - in - let buf = MBytes.create maxlen in - on_cancel (fun () -> - (* send_msg ~crypt socket buf Disconnect >>= fun _ -> *) - LU.close socket >>= fun _ -> - Lwt.return_unit) ; - let worker_name = - Format.asprintf - "(%a) connection handler for %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port in - ignore (Lwt_utils.worker worker_name - ~run:(fun () -> connect buf) ~cancel) ; - (* return the canceler *) - cancel - - - (* JSON format for on-disk peers cache file *) - let addr_encoding = - let open Data_encoding in - splitted - ~json: - (conv - Ipaddr.to_string - (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) - string) - ~binary: - (union ~tag_size:`Uint8 - [ case ~tag:4 - (Fixed.string 4) - (fun ip -> Utils.map_option Ipaddr.V4.to_bytes (Ipaddr.to_v4 ip) ) - (fun b -> Ipaddr.(V4 (V4.of_bytes_exn b))) ; - case ~tag:6 - (Fixed.string 32) - (fun ip -> Some (Ipaddr.V6.to_bytes (Ipaddr.to_v6 ip))) - (fun b -> Ipaddr.(V6 (V6.of_bytes_exn b))) ; - ]) - - let peers_file_encoding = - let open Data_encoding in - obj5 - (req "gid" string) - (req "public_key" Crypto_box.public_key_encoding) - (req "secret_key" Crypto_box.secret_key_encoding) - (req "proof_of_work" Crypto_box.nonce_encoding) - (req "peers" - (obj3 - (req "known" - (list (obj3 - (req "addr" addr_encoding) - (req "port" int31) - (opt "infos" - (obj4 - (req "connections" int31) - (req "lastSeen" float) - (req "gid" string) - (req "public_key" - Crypto_box.public_key_encoding)))))) - (req "blacklisted" - (list (obj2 - (req "addr" addr_encoding) - (req "until" float)))) - (req "whitelisted" - (list (obj2 - (req "addr" addr_encoding) - (req "port" int31)))))) - - (* Info on peers maintained between connections *) - type source = { - unreachable_since : float option; - connections : (int * float * Crypto_box.public_key) option ; - white_listed : bool ; - meta : P.metadata ; - } - - (* Ad hoc comparison on sources such as good source < bad source *) - let compare_sources s1 s2 = - match s1.white_listed, s2.white_listed with - | true, false -> -1 | false, true -> 1 - | _, _ -> - match s1.unreachable_since, s2.unreachable_since with - | None, Some _ -> -1 | Some _, None -> 1 - | _, _ -> - match s1.connections, s2.connections with - | Some _, None -> -1 | None, Some _ -> 1 | None, None -> 0 - | Some (n1, t1, _), Some (n2, t2, _) -> - if n1 = n2 then compare t2 t1 - else compare n2 n1 - - (* A store for blacklisted addresses (we ban any peer on a blacklisted - address, which is the policy that seems to make the most sense) *) - module BlackList = Map.Make (struct type t = addr let compare = compare end) - - (* A good random string so it is probably unique on the network *) - let fresh_gid () = - Bytes.to_string @@ Sodium.Random.Bytes.generate gid_length - - (* The (fixed size) broadcast frame. *) - let discovery_message_encoding = - let open Data_encoding in - tup3 (Fixed.string 8) (Fixed.string gid_length) int16 - - let discovery_message gid port = - Data_encoding.Binary.to_bytes - discovery_message_encoding - ("DISCOVER", gid, port) - - (* Broadcast frame verifier. *) - let answerable_discovery_message msg my_gid when_ok when_not = - match msg with - | Some ("DISCOVER", gid, port) when gid <> my_gid -> when_ok gid port - | _ -> when_not () - - let string_of_unix_exn = function - | Unix.Unix_error (err, fn, _) -> "in " ^ fn ^ ", " ^ Unix.error_message err - | exn -> Printexc.to_string exn - - (* Launch an answer machine for the discovery mechanism, takes a - callback to fill the answers and returns a canceler function *) - let discovery_answerer my_gid disco_port cancelation callback = - (* init a UDP listening socket on the broadcast canal *) - Lwt.catch begin fun () -> - let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.(setsockopt main_socket SO_BROADCAST true) ; - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; - Lwt.return (Some main_socket) - end - (fun exn -> - debug "(%a) will not listen to discovery requests (%s)" - pp_gid my_gid (string_of_unix_exn exn) ; - Lwt.return_none) >>= function - | None -> Lwt.return_unit - | Some main_socket -> - (* the answering function *) - let rec step () = - let buffer = discovery_message my_gid 0 in - let len = MBytes.length buffer in - Lwt.pick - [ (cancelation () >>= fun () -> Lwt.return_none) ; - (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> - Lwt.return (Some r)) ] >>= function - | None -> Lwt.return_unit - | Some (len', LU.ADDR_INET (addr, _)) when len' = len -> - answerable_discovery_message - (Data_encoding.Binary.of_bytes - discovery_message_encoding buffer) - my_gid - (fun _ port -> - Lwt.catch begin fun () -> - callback addr port - end - (fun _ -> (* ignore errors *) Lwt.return_unit) >>= fun () -> - step ()) - step - | Some _ -> step () - in step () - - (* Sends dicover messages into space in an exponentially delayed loop, - restartable using a condition *) - let discovery_sender my_gid disco_port inco_port cancelation restart = - let msg = discovery_message my_gid inco_port in - let rec loop delay n = - Lwt.catch begin fun () -> - let socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.setsockopt socket LU.SO_BROADCAST true ; - let broadcast_ipv4 = Unix.inet_addr_of_string "255.255.255.255" in - LU.connect socket - LU.(ADDR_INET (broadcast_ipv4, disco_port)) >>= fun () -> - Lwt_utils.write_mbytes socket msg >>= fun _ -> - LU.close socket - end - (fun _ -> - debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; - Lwt.return_unit) >>= fun () -> - Lwt.pick - [ (LU.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ; - (cancelation () >>= fun () -> Lwt.return_none) ; - (LC.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ] - >>= function - | Some (delay, n) when n = 10 -> - loop delay 9 - | Some (delay, n) -> - loop (delay *. 2.) n - | None -> Lwt.return_unit - in loop 0.2 1 - - (* Main network creation and initialisation function *) let bootstrap ~config ~limits = - (* we need to ignore SIGPIPEs *) - Sys.(set_signal sigpipe Signal_ignore) ; - (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = Lwt_utils.canceler () in - (* create the internal event pipe *) - let events = Lwt_pipe.create ~size:100 () in - (* create the external message pipe *) - let messages = Lwt_pipe.create ~size:100 () in - (* fill the known peers pools from last time *) - Data_encoding_ezjsonm.read_file config.peers_file >>= fun res -> - let known_peers, black_list, my_gid, - my_public_key, my_secret_key, my_proof_of_work = - let init_peers () = - let my_gid = - fresh_gid () in - let (my_secret_key, my_public_key, _) = - Crypto_box.random_keypair () in - let my_proof_of_work = - Crypto_box.generate_proof_of_work - my_public_key Crypto_box.default_target in - let known_peers = - let source = { unreachable_since = None ; - connections = None ; - white_listed = true ; - meta = P.initial_metadata ; - } - in - List.fold_left - (fun r point -> PeerMap.update point source r) - PeerMap.empty config.known_peers in - let black_list = - BlackList.empty in - known_peers, black_list, my_gid, - my_public_key, my_secret_key, my_proof_of_work in - match res with - | None -> - let known_peers, black_list, my_gid, - my_public_key, my_secret_key, my_proof_of_work = init_peers () in - debug "(%a) peer cache initiated" pp_gid my_gid ; - ref known_peers, ref black_list, my_gid, - my_public_key, my_secret_key, my_proof_of_work - | Some json -> - match Data_encoding.Json.destruct peers_file_encoding json with - | exception _ -> - let known_peers, black_list, my_gid, - my_public_key, my_secret_key, my_proof_of_work = init_peers () in - debug "(%a) peer cache reset" pp_gid my_gid ; - ref known_peers, ref black_list, - my_gid, my_public_key, my_secret_key, my_proof_of_work - | (my_gid, my_public_key, my_secret_key, my_proof_of_work, (k, b, w)) -> - let white_list = - List.fold_right PointSet.add w PointSet.empty in - let known_peers = - List.fold_left - (fun r (addr, port, infos) -> - match infos with - | None -> - let source = - { unreachable_since = None ; - connections = None ; - white_listed = true ; - meta = P.initial_metadata ; } in - PeerMap.update (addr, port) source r - | Some (c, t, gid, pk) -> - let source = - { unreachable_since = None ; - connections = Some (c, t, pk) ; - white_listed = PointSet.mem (addr, port) white_list ; - meta = P.initial_metadata ; } in - PeerMap.update (addr, port) ~gid source r) - PeerMap.empty k in - let black_list = - List.fold_left - (fun r (a, d) -> BlackList.add a d r) - BlackList.empty b in - debug "(%a) peer cache loaded" pp_gid my_gid ; - ref known_peers, ref black_list, - my_gid, my_public_key, my_secret_key, my_proof_of_work - in - (* some peer reachability predicates *) - let black_listed (addr, _) = - BlackList.mem addr !black_list in - let white_listed point = - try (PeerMap.by_point point !known_peers).white_listed - with Not_found -> false in - let grey_listed point = - try match (PeerMap.by_point point !known_peers).unreachable_since with - | None -> false | Some t -> Unix.gettimeofday () -. t > 5. - with Not_found -> false in - (* save the cache at exit *) - on_cancel (fun () -> - (* save the known peers cache *) - let json = - Data_encoding.Json.construct peers_file_encoding @@ - (my_gid, - my_public_key, - my_secret_key, - my_proof_of_work, - PeerMap.fold - (fun (addr, port) gid source (k, b, w) -> - let infos = match gid, source.connections with - | Some gid, Some (n, t, pk) -> Some (n, t, gid, pk) - | _ -> None in - ((addr, port, infos) :: k, - b, - if source.white_listed then (addr, port) :: w else w)) - !known_peers ([], BlackList.bindings !black_list, [])) - in - Data_encoding_ezjsonm.write_file config.peers_file json >>= fun _ -> - debug "(%a) peer cache saved" pp_gid my_gid ; - Lwt.return_unit) ; - (* storage of active and not yet active peers *) - let incoming = ref PointMap.empty in - let connected = ref PeerMap.empty in - (* peer welcoming (accept) loop *) - let welcome () = - match config.incoming_port with - | None -> (* no input port => no welcome worker *) Lwt.return_unit - | Some port -> - (* open port for incoming connexions *) - let addr = Unix.inet6_addr_any in - Lwt.catch begin fun () -> - let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (addr, port))) ; - LU.listen main_socket limits.max_connections ; - Lwt.return (Some main_socket) - end - (fun exn -> - debug "(%a) cannot accept incoming peers (%s)" - pp_gid my_gid (string_of_unix_exn exn) ; - Lwt.return_none) - >>= function - | None -> - (* FIXME: run in degraded mode, better exit ? *) - Lwt.return_unit - | Some main_socket -> - (* then loop *) - let rec step () = - Lwt.pick - [ ( LU.accept main_socket >>= fun (s, a) -> - Lwt.return (Some (s, a)) ) ; - ( cancelation () >>= fun _ -> - Lwt.return_none ) ] - >>= function - | None -> - LU.close main_socket - | Some (socket, addr) -> - match addr with - | LU.ADDR_INET (addr, port) -> - let addr = Ipaddr_unix.of_inet_addr addr in - Lwt_pipe.push events (Contact ((addr, port), socket)) >>= - step - | _ -> - Lwt.async (fun () -> LU.close socket) ; - step () - in step () - in - (* input maintenance events *) - let too_many_peers = LC.create () in - let too_few_peers = LC.create () in - let new_peer = LC.create () in - let new_contact = LC.create () in - let please_maintain = LC.create () in - let restart_discovery = LC.create () in - (* output maintenance events *) - let just_maintained = LC.create () in - (* maintenance worker, returns when [connections] peers are connected *) - let rec maintenance () = - Lwt.pick - [ ( LU.sleep 120. >>= fun () -> - Lwt.return_true) ; (* every two minutes *) - ( LC.wait please_maintain >>= fun () -> - Lwt.return_true) ; (* when asked *) - ( LC.wait too_few_peers >>= fun () -> - Lwt.return_true) ; (* limits *) - ( LC.wait too_many_peers >>= fun () -> - Lwt.return_true) ; - ( cancelation () >>= fun () -> - Lwt.return_false) ] >>= fun continue -> - let rec maintain () = - let n_connected = PeerMap.cardinal !connected in - if n_connected >= limits.expected_connections - && n_connected <= limits.max_connections then - (* end of maintenance when enough users have been reached *) - (LC.broadcast just_maintained () ; - debug "(%a) maintenance step ended" - pp_gid my_gid ; - maintenance ()) - else if n_connected < limits.expected_connections then - (* too few peers, try and contact many peers *) - let contact nb = - let contactable = - (* we sort sources by level (prefered first) *) - PeerMap.bindings !known_peers |> - List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> - (* remove the ones we're connect(ed/ing) to and the blacklisted *) - List.filter (fun (point, gid, source) -> - (not (black_listed point) || source.white_listed) - && not (grey_listed point) - && not (gid = Some my_gid) - && not (PeerMap.mem_by_point point !connected) - && not (PointMap.mem point !incoming) - && match gid with | None -> true | Some gid -> - not (PeerMap.mem_by_gid gid !connected)) in - let rec do_contact_loop strec = - match strec with - | 0, _ -> Lwt.return_true - | _, [] -> - Lwt.return_false (* we didn't manage to contact enough peers *) - | nb, ((addr, port), gid, source) :: tl -> - (* we try to open a connection *) - let socket = - let open LU in - let open Ipaddr in - let family = - match addr with V4 _ -> PF_INET | V6 _ -> PF_INET6 in - socket family SOCK_STREAM 0 in - let uaddr = Ipaddr_unix.to_inet_addr addr in - Lwt.catch begin fun () -> - debug "(%a) trying to connect to %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - Lwt.pick - [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; - LU.connect socket (LU.ADDR_INET (uaddr, port)) - ] >>= fun () -> - debug "(%a) connected to %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port; - Lwt_pipe.push events - (Contact ((addr, port), socket)) >>= fun () -> - Lwt.return (nb - 1) - end - (fun exn -> - debug "(%a) connection failed to %a:%d (%s)" - pp_gid my_gid Ipaddr.pp_hum addr port - (string_of_unix_exn exn); - (* if we didn't succes, we greylist it *) - let now = Unix.gettimeofday () in - known_peers := - PeerMap.update (addr, port) ?gid - { source with unreachable_since = Some now } - !known_peers ; - LU.close socket >>= fun () -> - Lwt.return nb) >>= fun nrec -> - do_contact_loop (nrec, tl) - in do_contact_loop (nb, contactable) - in - let to_contact = limits.max_connections - n_connected in - debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ; - contact to_contact >>= function - | true -> (* enough contacts, now wait for connections *) - Lwt.pick - [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ; - (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ; - (cancelation () >>= fun () -> Lwt.return_false) ] - >>= fun continue -> - if continue then maintain () else Lwt.return_unit - | false -> (* not enough contacts, ask the pals of our pals, - discover the local network and then wait *) - LC.broadcast restart_discovery () ; - (PeerMap.iter - (fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap)) - !connected ; - Lwt.pick - [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ; - (LC.wait new_contact >>= fun _ -> Lwt.return_true) ; - (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ; - (cancelation () >>= fun () -> Lwt.return_false) ] - >>= fun continue -> - if continue then maintain () else Lwt.return_unit) - else - (* too many peers, start the russian roulette *) - let to_kill = n_connected - limits.max_connections in - debug "(%a) too many connections, will kill %d" pp_gid my_gid to_kill ; - snd (PeerMap.fold - (fun _ _ peer (i, t) -> - if i = 0 then (0, t) - else (i - 1, t >>= fun () -> peer.disconnect ())) - !connected (to_kill, Lwt.return_unit)) >>= fun () -> - (* and directly skip to the next maintenance request *) - LC.broadcast just_maintained () ; - debug "(%a) maintenance step ended" pp_gid my_gid ; - maintenance () - in - if continue then maintain () else Lwt.return_unit - in - (* select the peers to send on a bootstrap request *) - let bootstrap_peers () = - (* we sort peers by desirability *) - PeerMap.bindings !known_peers |> - List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |> - List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> - (* we simply send the first 50 (or less) known peers *) - List.fold_left - (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) - (50, []) |> snd - in - let next_peer_event () = - let rec peer_events () = - let peers = PeerMap.bindings !connected in - let current_peers_evts = - filter_map begin function - | _, Some gid, p -> Some (Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader) - | _ -> None - end peers - in - Lwt.choose [ - (LC.wait new_peer >>= fun _p -> peer_events ()); - Lwt.nchoose current_peers_evts; - ] - in - peer_events () >>= fun evts -> - let nb_evts = List.length evts in - let gid, evtqueue = List.nth evts (Random.int nb_evts) in - lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid gid >|= fun () -> - Lwt_pipe.pop_now_exn evtqueue - in - let rec peers () = - (* user event handling worker *) - Lwt.pick [ - next_peer_event () ; - cancelation () >>= fun () -> Lwt.return Shutdown ; - ] >>= fun event -> match event with - | Recv (peer, msg) -> Lwt_pipe.push messages (peer, msg) >>= peers - | msg -> Lwt_pipe.push events msg >>= peers - in - (* internal event handling worker *) - let rec admin () = - Lwt.pick - [ Lwt_pipe.pop events ; - cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> - match event with - | Recv _ -> - (* Invariant broken *) - Lwt.fail_with "admin: got a Recv message (broken invariant)" - | Disconnected peer -> - debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; - (* remove it from the tables *) - connected := PeerMap.remove_by_point peer.point !connected ; - if PeerMap.cardinal !connected < limits.min_connections then - LC.broadcast too_few_peers () ; - incoming := PointMap.remove peer.point !incoming ; - admin () - | Connected peer -> - incoming := PointMap.remove peer.point !incoming ; - let update_infos () = - (* we update our knowledge table according to the - reachable address given by the peer *) - match peer.listening_port with - | None -> () - | Some port -> - let point = (fst peer.point, port) in - let update source = - (* delete previous infos about this address / gid *) - known_peers := PeerMap.remove_by_point point !known_peers ; - known_peers := PeerMap.remove_by_gid peer.gid !known_peers ; - (* then assign *) - known_peers := - PeerMap.update point ~gid:peer.gid source !known_peers - in update @@ - try match PeerMap.by_gid peer.gid !known_peers with - | { connections = None ; white_listed } -> - { connections = - Some (1, Unix.gettimeofday (), peer.public_key) ; - unreachable_since = None ; - white_listed ; - meta = P.initial_metadata } - | { connections = Some (n, _, _) ; white_listed } -> - { connections = - Some (n + 1, Unix.gettimeofday (), peer.public_key) ; - unreachable_since = None ; - white_listed ; - meta = P.initial_metadata } - with Not_found -> - { connections = - Some (1, Unix.gettimeofday (), peer.public_key) ; - unreachable_since = None ; - white_listed = white_listed point ; - meta = P.initial_metadata } - in - (* if it's me, it's probably not me *) - if my_gid = peer.gid then begin - debug "(%a) rejected myself from %a:%d" - pp_gid my_gid Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; - (* now that I know my address, I can save this info to - prevent future reconnections to myself *) - update_infos () ; - Lwt.async peer.disconnect - end - (* keep only one connection to each node by checking its gid *) - else if PeerMap.mem_by_gid peer.gid !connected then begin - debug "(%a) rejected already connected peer %a @@ %a:%d" - pp_gid my_gid pp_gid peer.gid - Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; - update_infos () ; - Lwt.async peer.disconnect - end else begin - debug "(%a) connected peer %a @@ %a:%d" - pp_gid my_gid pp_gid peer.gid - Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; - update_infos () ; - connected := - PeerMap.update peer.point ~gid:peer.gid peer !connected ; - if PeerMap.cardinal !connected > limits.max_connections then - LC.broadcast too_many_peers () ; - LC.broadcast new_peer peer - end ; - admin () - | Contact ((addr, port), socket) -> - (* we do not check the credentials at this stage, since they - could change from one connection to the next *) - if PointMap.mem (addr, port) !incoming - || PeerMap.mem_by_point (addr, port) !connected - || BlackList.mem addr !black_list then - LU.close socket >>= fun () -> - admin () - else - let canceler = - connect_to_peer - config limits my_gid my_public_key my_secret_key my_proof_of_work - socket (addr, port) events white_listed in - debug "(%a) incoming peer @@ %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - incoming := PointMap.add (addr, port) canceler !incoming ; - admin () - | Bootstrap peer -> - let sample = bootstrap_peers () in - Lwt.async (fun () -> peer.send (Advertise sample)) ; - admin () - | Peers peers -> - List.iter - (fun point -> - if not (PeerMap.mem_by_point point !known_peers) then - let source = - { unreachable_since = None ; - connections = None ; - white_listed = false ; - meta = P.initial_metadata } in - known_peers := PeerMap.update point source !known_peers ; - LC.broadcast new_contact point) - peers ; - admin () - | Shutdown -> - Lwt.return_unit - in - (* blacklist filter *) - let rec unblock () = - Lwt.pick - [ (Lwt_unix.sleep 20. >>= fun _ -> Lwt.return_true) ; - (cancelation () >>= fun () -> Lwt.return_false) ] >>= fun continue -> - if continue then - let now = Unix.gettimeofday () in - black_list := BlackList.fold - (fun addr d map -> if d < now then map else BlackList.add addr d map) - !black_list BlackList.empty ; - known_peers := - PeerMap.fold (fun point gid source map -> - let source = - match source.unreachable_since with - | Some t when now -. t < 20. -> source - | _ -> { source with unreachable_since = None } in - PeerMap.update point ?gid source map) - !known_peers PeerMap.empty ; - unblock () - else Lwt.return_unit - in - (* launch all workers *) - let welcome = - Lwt_utils.worker - (Format.asprintf "(%a) welcome" pp_gid my_gid) - welcome cancel in - let maintenance = - Lwt_utils.worker - (Format.asprintf "(%a) maintenance" pp_gid my_gid) - maintenance cancel in - let peers_worker = - Lwt_utils.worker - (Format.asprintf "(%a) peers" pp_gid my_gid) - peers cancel in - let admin = - Lwt_utils.worker - (Format.asprintf "(%a) admin" pp_gid my_gid) - admin cancel in - let unblock = - Lwt_utils.worker - (Format.asprintf "(%a) unblacklister" pp_gid my_gid) - unblock cancel in - let discovery_answerer = - let callback inet_addr port = - let addr = Ipaddr_unix.of_inet_addr inet_addr in - (* do not reply to ourselves or connected peers *) - if not (PeerMap.mem_by_point (addr, port) !connected) - && (try match PeerMap.gid_by_point (addr, port) !known_peers with - | Some gid -> not (PeerMap.mem_by_gid gid !connected) && not (my_gid = gid) - | None -> true - with Not_found -> true) - then - (* connect if we need peers *) - if PeerMap.cardinal !connected >= limits.expected_connections then - Lwt_pipe.push events (Peers [ addr, port ]) - else - let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.connect socket LU.(ADDR_INET (inet_addr, port)) >>= fun () -> - Lwt_pipe.push events (Contact ((addr, port), socket)) - else Lwt.return_unit - in - match config.discovery_port with - | None -> Lwt.return_unit - | Some disco_port -> - Lwt_utils.worker - (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) - (fun () -> discovery_answerer my_gid disco_port cancelation callback) - cancel - in - let discovery_sender = - match config.incoming_port, config.discovery_port with - | Some inco_port, Some disco_port -> - let sender () = - discovery_sender - my_gid disco_port inco_port cancelation restart_discovery in - Lwt_utils.worker - (Format.asprintf "(%a) discovery sender" pp_gid my_gid) - sender cancel - | _ -> Lwt.return_unit in - (* net manipulation callbacks *) - let rec shutdown () = - debug "(%a) starting network shutdown" pp_gid my_gid ; - (* stop accepting clients *) - cancel () >>= fun () -> - (* wait for both workers to end *) - Lwt.join [ welcome ; peers_worker ; admin ; maintenance ; unblock ; - discovery_answerer ; discovery_sender ] >>= fun () -> - (* properly shutdown all peers *) - let cancelers = - PeerMap.fold - (fun point _ peer res -> - (peer.disconnect () >>= fun () -> - connected := PeerMap.remove_by_point point !connected ; - Lwt.return_unit) :: res) - !connected @@ - PointMap.fold - (fun point canceler res -> - (canceler () >>= fun () -> - incoming := PointMap.remove point !incoming ; - Lwt.return_unit) :: res) - !incoming @@ [] - in - Lwt.join cancelers >>= fun () -> - debug "(%a) network shutdown complete" pp_gid my_gid ; - Lwt.return_unit - and peers () = - PeerMap.fold (fun _ _ peer r -> peer :: r) !connected [] - and find_peer gid = - try Some (PeerMap.by_gid gid !connected) with Not_found -> None - and peer_info (peer : peer) = { - gid = peer.gid ; - addr = fst peer.point ; - port = snd peer.point ; - version = peer.version ; - total_sent = peer.total_sent () ; - total_recv = peer.total_recv () ; - current_outflow = peer.current_outflow () ; - current_inflow = peer.current_inflow () ; + Real.create ~config ~limits >>= fun net -> + Real.maintain net () >>= fun () -> + Lwt.return { + gid = Real.gid net ; + maintain = Real.maintain net ; + roll = Real.roll net ; + shutdown = Real.shutdown net ; + connections = Real.connections net ; + find_connection = Real.find_connection net ; + connection_info = Real.connection_info net ; + connection_stat = Real.connection_stat net ; + global_stat = Real.global_stat net ; + get_metadata = Real.get_metadata net ; + set_metadata = Real.set_metadata net ; + recv = Real.recv net ; + send = Real.send net ; + try_send = Real.try_send net ; + broadcast = Real.broadcast net ; } - and recv_from () = - Lwt_pipe.pop messages - and send_to peer msg = - peer.send (Message msg) - and try_send_to peer msg = - peer.try_send (Message msg) - and broadcast msg = - PeerMap.iter - (fun _ _ peer -> - Lwt.async (fun () -> peer.send (Message msg))) - !connected - and blacklist ?(duration = limits.blacklist_time) addr = - let t = Unix.gettimeofday () +. duration in - black_list := BlackList.add addr t !black_list ; - debug "(%a) address %a blacklisted" pp_gid my_gid Ipaddr.pp_hum addr ; - (* we ban this peer, but also all the ones at this address, even - when whitelisted (the blacklist operation wins) *) - known_peers := - PeerMap.fold - (fun ((a, _) as point) gid p map -> - if a = addr then map else PeerMap.update point ?gid p map) - !known_peers PeerMap.empty ; - (* we disconnect all peers at this address sur-le-champ *) - PeerMap.iter - (fun (a, _) _ p -> if addr = a then - Lwt.async (fun () -> p.disconnect ())) - !connected ; - (* and prevent incoming connections *) - PointMap.iter - (fun (a, _) cancel -> if a = addr then Lwt.async cancel) - !incoming - and whitelist_point point = - let source, gid = try - { (PeerMap.by_point point !known_peers) - with white_listed = true }, - PeerMap.gid_by_point point !known_peers - with Not_found -> - { unreachable_since = None ; - connections = None ; - white_listed = true ; - meta = P.initial_metadata }, - None in - known_peers := PeerMap.update point ?gid source !known_peers - and whitelist peer = - (* we promote this peer to the white list, if reachable *) - match peer.listening_port with - | Some port -> - let point = fst peer.point, port in - whitelist_point point - | None -> () + let faked_network = { + gid = Fake.id.gid ; + maintain = Lwt.return ; + roll = Lwt.return ; + shutdown = Lwt.return ; + connections = (fun () -> []) ; + find_connection = (fun _ -> None) ; + connection_info = (fun _ -> Fake.connection_info) ; + connection_stat = (fun _ -> Fake.empty_stat) ; + global_stat = (fun () -> Fake.empty_stat) ; + get_metadata = (fun _ -> None) ; + set_metadata = (fun _ _ -> ()) ; + recv = (fun () -> Lwt_utils.never_ending) ; + send = (fun _ _ -> Lwt_utils.never_ending) ; + try_send = (fun _ _ -> false) ; + broadcast = ignore ; + } - and maintain () = - let waiter = LC.wait just_maintained in - LC.broadcast please_maintain () ; - waiter - and roll () = Pervasives.failwith "roll" - and get_metadata _gid = None (* TODO: implement *) - and set_metadata _gid _meta = () (* TODO: implement *) - in - let net = - { gid = my_gid ; shutdown ; peers ; find_peer ; - recv_from ; send_to ; try_send_to ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; - peer_info ; get_metadata ; set_metadata } in - (* main thread, returns after first successful maintenance *) - maintain () >>= fun () -> - debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; - Lwt.return net - - let faked_network = - let gid = String.make 16 '\000' in - let infinity, wakeup = Lwt.wait () in - let shutdown () = - Lwt.wakeup_exn wakeup Queue.Empty; - Lwt.return_unit in - let peers () = [] in - let find_peer _ = None in - let recv_from () = infinity in - let send_to _ _ = Lwt.return_unit in - let try_send_to _ _ = true in - let broadcast _ = () in - let blacklist ?duration _ = ignore duration ; () in - let whitelist _ = () in - let maintain () = Lwt.return_unit in - let roll () = Lwt.return_unit in - let peer_info _ = assert false in - let get_metadata _ = None in - let set_metadata _ _ = () in - { gid ; shutdown ; peers ; find_peer ; - recv_from ; send_to ; try_send_to ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; - peer_info ; get_metadata ; set_metadata } - - - (* Plug toplevel functions to callback calls. *) let gid net = net.gid - let shutdown net = net.shutdown () - let peers net = net.peers () - let find_peer net gid = net.find_peer gid - let peer_info net peer = net.peer_info peer - let recv net = net.recv_from () - let send net peer msg = net.send_to peer msg - let try_send net peer msg = net.try_send_to peer msg - let broadcast net msg = net.broadcast msg let maintain net = net.maintain () let roll net = net.roll () - let blacklist _net _gid = () - let whitelist _net _gid = () - let get_metadata net gid = net.get_metadata gid - let set_metadata net gid meta = net.set_metadata gid meta + let shutdown net = net.shutdown () + let connections net = net.connections () + let find_connection net = net.find_connection + let connection_info net = net.connection_info + let connection_stat net = net.connection_stat + let global_stat net = net.global_stat () + let get_metadata net = net.get_metadata + let set_metadata net = net.set_metadata + let recv net = net.recv () + let send net = net.send + let try_send net = net.try_send + let broadcast net = net.broadcast + + module Raw = struct + type 'a t = 'a P2p_connection_pool.Message.t = + | Bootstrap + | Advertise of P2p_types.Point.t list + | Message of 'a + | Disconnect + type message = Message.t t + let encoding = P2p_connection_pool.Message.encoding Message.encoding + let supported_versions = Message.supported_versions + end + end diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index a5ad8767a..bd5d7c5ae 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -8,86 +8,117 @@ (**************************************************************************) (** A peer connection address *) -type addr = Ipaddr.t +type addr = Ipaddr.V6.t (** A peer connection port *) type port = int (** A p2p protocol version *) -type version = { - name : string ; - major : int ; - minor : int ; -} +module Version = P2p_types.Version + +(** A global identifier for a peer, a.k.a. an identity *) +module Gid = P2p_types.Gid + +module Identity = P2p_types.Identity + +module Point = P2p_types.Point + +module Id_point = P2p_types.Id_point + +module Connection_info = P2p_types.Connection_info + +module Stat = P2p_types.Stat (** Network configuration *) type config = { + + listening_port : port option; (** Tells if incoming connections accepted, precising the TCP port on which the peer can be reached *) - incoming_port : port option ; - (** Tells if peers should be discovered automatically on the local - network, precising the UDP port to use *) - discovery_port : port option ; - (** List of hard-coded known peers to bootstrap the network from *) - known_peers : (addr * port) list ; - (** The path to the JSON file where the peer cache is loaded / stored *) + + listening_addr : addr option; + (** When incoming connections are accepted, precising on which + IP adddress the node listen (default: [[::]]). *) + + trusted_points : Point.t list ; + (** List of hard-coded known peers to bootstrap the network from. *) + peers_file : string ; - (** If [true], the only accepted connections are from peers whose - addresses are in [known_peers] *) + (** The path to the JSON file where the metadata associated to + gids are loaded / stored. *) + closed_network : bool ; + (** If [true], the only accepted connections are from peers whose + addresses are in [trusted_peers]. *) + + identity : Identity.t ; + (** Cryptographic identity of the peer. *) + + proof_of_work_target : Crypto_box.target ; + (** Expected level of proof of work of peers' identity. *) + } (** Network capacities *) type limits = { - (** Maximum length in bytes of network messages *) - max_message_size : int ; - (** Delay after which a non responding peer is considered dead *) - peer_answer_timeout : float ; - (** Minimum number of connections to reach when staring / maitening *) - expected_connections : int ; - (** Strict minimum number of connections (triggers an urgent maintenance) *) + + authentification_timeout : float ; + (** Delay granted to a peer to perform authentication, in seconds. *) + min_connections : int ; - (** Maximum number of connections (exceeding peers are disconnected) *) + (** Strict minimum number of connections (triggers an urgent maintenance) *) + + expected_connections : int ; + (** Targeted number of connections to reach when bootstraping / maitening *) + max_connections : int ; - (** How long peers can be blacklisted for maintenance *) - blacklist_time : float ; + (** Maximum number of connections (exceeding peers are disconnected) *) + + backlog : int ; + (** Argument of [Lwt_unix.accept].*) + + max_incoming_connections : int ; + (** Maximum not-yet-authentified incoming connections. *) + + max_download_speed : int option ; + (** Hard-limit in the number of bytes received per second. *) + + max_upload_speed : int option ; + (** Hard-limit in the number of bytes sent per second. *) + + read_buffer_size : int ; + (** Size in bytes of the buffer passed to [Lwt_unix.read]. *) + + read_queue_size : int option ; + write_queue_size : int option ; + incoming_app_message_queue_size : int option ; + incoming_message_queue_size : int option ; + outgoing_message_queue_size : int option ; + (** Various bounds for internal queues. *) + } -(** A global identifier for a peer, a.k.a. an identity *) -type gid -val pp_gid : Format.formatter -> gid -> unit - -type 'msg encoding = Encoding : { - tag: int ; - encoding: 'a Data_encoding.t ; - wrap: 'a -> 'msg ; - unwrap: 'msg -> 'a option ; - max_length: int option ; - } -> 'msg encoding - -module type PARAMS = sig - - (** Type of message used by higher layers *) - type msg - - val encodings : msg encoding list - - (** Type of metadata associated to an identity *) - type metadata - - val initial_metadata : metadata - val metadata_encoding : metadata Data_encoding.t - val score : metadata -> float +(** Type of message used by higher layers *) +module type MESSAGE = sig + type t + val encoding : t P2p_connection_pool.encoding list (** High level protocol(s) talked by the peer. When two peers initiate a connection, they exchange their list of supported versions. The chosen one, if any, is the maximum common one (in lexicographic order) *) - val supported_versions : version list - + val supported_versions : Version.t list end -module Make (P : PARAMS) : sig +(** Type of metadata associated to an identity *) +module type METADATA = sig + type t + val initial : t + val encoding : t Data_encoding.t + val score : t -> float +end + +module Make (Message : MESSAGE) (Metadata : METADATA) : sig type net @@ -99,7 +130,7 @@ module Make (P : PARAMS) : sig val bootstrap : config:config -> limits:limits -> net Lwt.t (** Return one's gid *) - val gid : net -> gid + val gid : net -> Gid.t (** A maintenance operation : try and reach the ideal number of peers *) val maintain : net -> unit Lwt.t @@ -111,51 +142,47 @@ module Make (P : PARAMS) : sig val shutdown : net -> unit Lwt.t (** A connection to a peer *) - type peer + type connection (** Access the domain of active peers *) - val peers : net -> peer list + val connections : net -> connection list (** Return the active peer with identity [gid] *) - val find_peer : net -> gid -> peer option - - type peer_info = { - gid : gid ; - addr : addr ; - port : port ; - version : version ; - total_sent : int ; - total_recv : int ; - current_inflow : float ; - current_outflow : float ; - } + val find_connection : net -> Gid.t -> connection option (** Access the info of an active peer, if available *) - val peer_info : net -> peer -> peer_info + val connection_info : net -> connection -> Connection_info.t + val connection_stat : net -> connection -> Stat.t + val global_stat : net -> Stat.t (** Accessors for meta information about a global identifier *) - val get_metadata : net -> gid -> P.metadata option - val set_metadata : net -> gid -> P.metadata -> unit + val get_metadata : net -> Gid.t -> Metadata.t option + val set_metadata : net -> Gid.t -> Metadata.t -> unit (** Wait for a message from any peer in the network *) - val recv : net -> (peer * P.msg) Lwt.t + val recv : net -> (connection * Message.t) Lwt.t (** [send net peer msg] is a thread that returns when [msg] has been successfully enqueued in the send queue. *) - val send : net -> peer -> P.msg -> unit Lwt.t + val send : net -> connection -> Message.t -> unit Lwt.t (** [try_send net peer msg] is [true] if [msg] has been added to the send queue for [peer], [false] otherwise *) - val try_send : net -> peer -> P.msg -> bool + val try_send : net -> connection -> Message.t -> bool (** Send a message to all peers *) - val broadcast : net -> P.msg -> unit + val broadcast : net -> Message.t -> unit - (** Shutdown the connection to all peers at this address and stop the - communications with this machine for [duration] seconds *) - val blacklist : net -> gid -> unit - - (** Keep a connection to this pair as often as possible *) - val whitelist : net -> gid -> unit + (**/**) + module Raw : sig + type 'a t = + | Bootstrap + | Advertise of P2p_types.Point.t list + | Message of 'a + | Disconnect + type message = Message.t t + val encoding: message Data_encoding.t + val supported_versions: P2p_types.Version.t list + end end diff --git a/src/node/shell/tezos_p2p.ml b/src/node/shell/tezos_p2p.ml index a013c3281..ce23e38df 100644 --- a/src/node/shell/tezos_p2p.ml +++ b/src/node/shell/tezos_p2p.ml @@ -1,29 +1,30 @@ -module Param = struct +type net_id = Store.net_id - type net_id = Store.net_id +type msg = + | Discover_blocks of net_id * Block_hash.t list (* Block locator *) + | Block_inventory of net_id * Block_hash.t list - type msg = + | Get_blocks of Block_hash.t list + | Block of MBytes.t - | Discover_blocks of net_id * Block_hash.t list (* Block locator *) - | Block_inventory of net_id * Block_hash.t list + | Current_operations of net_id + | Operation_inventory of net_id * Operation_hash.t list - | Get_blocks of Block_hash.t list - | Block of MBytes.t + | Get_operations of Operation_hash.t list + | Operation of MBytes.t - | Current_operations of net_id - | Operation_inventory of net_id * Operation_hash.t list + | Get_protocols of Protocol_hash.t list + | Protocol of MBytes.t - | Get_operations of Operation_hash.t list - | Operation of MBytes.t +module Message = struct - | Get_protocols of Protocol_hash.t list - | Protocol of MBytes.t + type t = msg - let encodings = + let encoding = let open Data_encoding in let case ?max_length ~tag encoding unwrap wrap = - P2p.Encoding { tag; encoding; wrap; unwrap; max_length } in + P2p_connection_pool.Encoding { tag; encoding; wrap; unwrap; max_length } in [ case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding)) (function @@ -71,13 +72,8 @@ module Param = struct (fun proto -> Protocol proto); ] - type metadata = unit - let initial_metadata = () - let metadata_encoding = Data_encoding.empty - let score () = 0. - let supported_versions = - let open P2p in + let open P2p.Version in [ { name = "TEZOS" ; major = 0 ; minor = 0 ; @@ -86,5 +82,15 @@ module Param = struct end -include Param -include P2p.Make(Param) +type metadata = unit + +module Metadata = struct + type t = metadata + let initial = () + let encoding = Data_encoding.empty + let score () = 0. +end + +include Message +include (Metadata : module type of Metadata with type t := metadata) +include P2p.Make(Message)(Metadata) diff --git a/src/node/shell/tezos_p2p.mli b/src/node/shell/tezos_p2p.mli index 9f27f5a32..1dc142de9 100644 --- a/src/node/shell/tezos_p2p.mli +++ b/src/node/shell/tezos_p2p.mli @@ -13,41 +13,30 @@ val bootstrap : config:config -> limits:limits -> net Lwt.t (** A maintenance operation : try and reach the ideal number of peers *) val maintain : net -> unit Lwt.t -(** Voluntarily drop some peers and replace them by new buddies *) +(** Voluntarily drop some connections and replace them by new buddies *) val roll : net -> unit Lwt.t (** Close all connections properly *) val shutdown : net -> unit Lwt.t (** A connection to a peer *) -type peer +type connection -(** Access the domain of active peers *) -val peers : net -> peer list +(** Access the domain of active connections *) +val connections : net -> connection list -(** Return the active peer with identity [gid] *) -val find_peer : net -> gid -> peer option +(** Return the active connection with identity [gid] *) +val find_connection : net -> Gid.t -> connection option -type peer_info = { - gid : gid ; - addr : addr ; - port : port ; - version : version ; - total_sent : int ; - total_recv : int ; - current_inflow : float ; - current_outflow : float ; -} - -(** Access the info of an active peer, if available *) -val peer_info : net -> peer -> peer_info +(** Access the info of an active connection. *) +val connection_info : net -> connection -> Connection_info.t (** Accessors for meta information about a global identifier *) type metadata = unit -val get_metadata : net -> gid -> metadata option -val set_metadata : net -> gid -> metadata -> unit +val get_metadata : net -> Gid.t -> metadata option +val set_metadata : net -> Gid.t -> metadata -> unit type net_id = Store.net_id @@ -68,23 +57,28 @@ type msg = | Get_protocols of Protocol_hash.t list | Protocol of MBytes.t -(** Wait for a payload from any peer in the network *) -val recv : net -> (peer * msg) Lwt.t +(** Wait for a payload from any connection in the network *) +val recv : net -> (connection * msg) Lwt.t -(** [send net peer msg] is a thread that returns when [msg] has been +(** [send net conn msg] is a thread that returns when [msg] has been successfully enqueued in the send queue. *) -val send : net -> peer -> msg -> unit Lwt.t +val send : net -> connection -> msg -> unit Lwt.t -(** [try_send net peer msg] is [true] if [msg] has been added to the +(** [try_send net conn msg] is [true] if [msg] has been added to the send queue for [peer], [false] otherwise *) -val try_send : net -> peer -> msg -> bool +val try_send : net -> connection -> msg -> bool (** Send a payload to all peers *) val broadcast : net -> msg -> unit -(** Shutdown the connection to all peers at this address and stop the - communications with this machine for [duration] seconds *) -val blacklist : net -> gid -> unit - -(** Keep a connection to this pair as often as possible *) -val whitelist : net -> gid -> unit +(**/**) +module Raw : sig + type 'a t = + | Bootstrap + | Advertise of P2p_types.Point.t list + | Message of 'a + | Disconnect + type message = msg t + val encoding: message Data_encoding.t + val supported_versions: P2p_types.Version.t list +end diff --git a/src/node_main.ml b/src/node_main.ml index 8980f40d6..84cfdb75e 100644 --- a/src/node_main.ml +++ b/src/node_main.ml @@ -7,6 +7,8 @@ (* *) (**************************************************************************) +module V6 = Ipaddr.V6 + open Error_monad open Logging.Node.Main @@ -54,15 +56,15 @@ type cfg = { min_connections : int ; max_connections : int ; expected_connections : int ; - net_addr : Ipaddr.t ; + net_addr : V6.t ; net_port : int ; - local_discovery : int option ; - peers : (Ipaddr.t * int) list ; + (* local_discovery : (string * int) option ; *) + peers : (V6.t * int) list ; peers_cache : string ; closed : bool ; (* rpc *) - rpc_addr : (Ipaddr.t * int) option ; + rpc_addr : (V6.t * int) option ; cors_origins : string list ; cors_headers : string list ; rpc_crt : string option ; @@ -88,9 +90,9 @@ let default_cfg_of_base_dir base_dir = { min_connections = 4 ; max_connections = 400 ; expected_connections = 20 ; - net_addr = Ipaddr.(V6 V6.unspecified) ; + net_addr = V6.unspecified ; net_port = 9732 ; - local_discovery = None ; + (* local_discovery = None ; *) peers = [] ; closed = false ; peers_cache = base_dir // "peers_cache" ; @@ -130,16 +132,21 @@ let sockaddr_of_string str = let addr, port = String.sub str 0 pos, String.sub str (pos+1) (len - pos - 1) in match Ipaddr.of_string_exn addr, int_of_string port with | exception Failure _ -> `Error "not a sockaddr" - | ip, port -> `Ok (ip, port) + | V4 ipv4, port -> `Ok (Ipaddr.v6_of_v4 ipv4, port) + | V6 ipv6, port -> `Ok (ipv6, port) let sockaddr_of_string_exn str = match sockaddr_of_string str with | `Ok saddr -> saddr | `Error msg -> invalid_arg msg -let pp_sockaddr fmt (ip, port) = Format.fprintf fmt "%a:%d" Ipaddr.pp_hum ip port +let pp_sockaddr fmt (ip, port) = Format.fprintf fmt "%a:%d" V6.pp_hum ip port let string_of_sockaddr saddr = Format.asprintf "%a" pp_sockaddr saddr +let mcast_params_of_string s = match Utils.split ':' s with + | [iface; port] -> iface, int_of_string port + | _ -> invalid_arg "mcast_params_of_string" + module Cfg_file = struct open Data_encoding @@ -150,12 +157,12 @@ module Cfg_file = struct (opt "protocol" string) let net = - obj8 + obj7 (opt "min-connections" uint16) (opt "max-connections" uint16) (opt "expected-connections" uint16) (opt "addr" string) - (opt "local-discovery" uint16) + (* (opt "local-discovery" string) *) (opt "peers" (list string)) (dft "closed" bool false) (opt "peers-cache" string) @@ -174,21 +181,29 @@ module Cfg_file = struct conv (fun { store ; context ; protocol ; min_connections ; max_connections ; expected_connections ; - net_addr ; net_port ; local_discovery ; peers ; + net_addr ; net_port ; + (* local_discovery ; *) + peers ; closed ; peers_cache ; rpc_addr ; cors_origins ; cors_headers ; log_output } -> let net_addr = string_of_sockaddr (net_addr, net_port) in + (* let local_discovery = Utils.map_option local_discovery *) + (* ~f:(fun (iface, port) -> iface ^ ":" ^ string_of_int port) *) + (* in *) let rpc_addr = Utils.map_option string_of_sockaddr rpc_addr in let peers = ListLabels.map peers ~f:string_of_sockaddr in let log_output = string_of_log log_output in ((Some store, Some context, Some protocol), (Some min_connections, Some max_connections, Some expected_connections, - Some net_addr, local_discovery, Some peers, closed, Some peers_cache), + Some net_addr, + (* local_discovery, *) + Some peers, closed, Some peers_cache), (rpc_addr, cors_origins, cors_headers), Some log_output)) (fun ( (store, context, protocol), (min_connections, max_connections, expected_connections, net_addr, - local_discovery, peers, closed, peers_cache), + (* local_discovery, *) + peers, closed, peers_cache), (rpc_addr, cors_origins, cors_headers), log_output) -> let open Utils in @@ -205,11 +220,14 @@ module Cfg_file = struct let min_connections = unopt default_cfg.min_connections min_connections in let max_connections = unopt default_cfg.max_connections max_connections in let expected_connections = unopt default_cfg.expected_connections expected_connections in + (* let local_discovery = map_option local_discovery ~f:mcast_params_of_string in *) { default_cfg with store ; context ; protocol ; - min_connections; max_connections; expected_connections; - net_addr; net_port ; local_discovery; peers; closed; peers_cache; - rpc_addr; cors_origins ; cors_headers ; log_output + min_connections ; max_connections ; expected_connections ; + net_addr ; net_port ; + (* local_discovery ; *) + peers ; closed ; peers_cache ; + rpc_addr ; cors_origins ; cors_headers ; log_output ; } ) (obj4 @@ -266,9 +284,9 @@ module Cmdline = struct let net_addr = let doc = "The TCP address and port at which this instance can be reached." in Arg.(value & opt (some sockaddr_converter) None & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["net-addr"]) - let local_discovery = - let doc = "Automatic discovery of peers on the local network." in - Arg.(value & opt (some int) None & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["local-discovery"]) + (* let local_discovery = *) + (* let doc = "Automatic discovery of peers on the local network." in *) + (* Arg.(value & opt (some @@ pair string int) None & info ~docs:"NETWORK" ~doc ~docv:"IFACE:PORT" ["local-discovery"]) *) let peers = let doc = "A peer to bootstrap the network from. Can be used several times to add several peers." in Arg.(value & opt_all sockaddr_converter [] & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["peer"]) @@ -298,7 +316,9 @@ module Cmdline = struct let parse base_dir config_file sandbox sandbox_param log_level min_connections max_connections expected_connections - net_saddr local_discovery peers closed rpc_addr tls cors_origins cors_headers reset_cfg update_cfg = + net_saddr + (* local_discovery *) + peers closed rpc_addr tls cors_origins cors_headers reset_cfg update_cfg = let base_dir = Utils.(unopt (unopt default_cfg.base_dir base_dir) sandbox) in let config_file = Utils.(unopt ((unopt base_dir sandbox) // "config")) config_file in @@ -340,7 +360,7 @@ module Cmdline = struct expected_connections = Utils.unopt cfg.expected_connections expected_connections ; net_addr = (match net_saddr with None -> cfg.net_addr | Some (addr, _) -> addr) ; net_port = (match net_saddr with None -> cfg.net_port | Some (_, port) -> port) ; - local_discovery = Utils.first_some local_discovery cfg.local_discovery ; + (* local_discovery = Utils.first_some local_discovery cfg.local_discovery ; *) peers = (match peers with [] -> cfg.peers | _ -> peers) ; closed = closed || cfg.closed ; rpc_addr = Utils.first_some rpc_addr cfg.rpc_addr ; @@ -359,7 +379,9 @@ module Cmdline = struct ret (const parse $ base_dir $ config_file $ sandbox $ sandbox_param $ v $ min_connections $ max_connections $ expected_connections - $ net_addr $ local_discovery $ peers $ closed + $ net_addr + (* $ local_discovery *) + $ peers $ closed $ rpc_addr $ rpc_tls $ cors_origins $ cors_headers $ reset_config $ update_config ), @@ -391,10 +413,11 @@ let init_logger { log_output ; log_level } = | `Null -> Logging.init Null | `Syslog -> Logging.init Syslog -let init_node { sandbox ; sandbox_param ; - store ; context ; - min_connections ; max_connections ; expected_connections ; - net_port ; peers ; peers_cache ; local_discovery ; closed } = +let init_node + { sandbox ; sandbox_param ; + store ; context ; + min_connections ; max_connections ; expected_connections ; + net_port ; peers ; peers_cache ; closed } = let patch_context json ctxt = let module Proto = (val Updater.get_exn genesis_protocol) in Lwt.catch @@ -428,20 +451,48 @@ let init_node { sandbox ; sandbox_param ; match sandbox with | Some _ -> None | None -> + (* TODO add parameters... *) + let authentification_timeout = 5. + and backlog = 20 + and max_incoming_connections = 20 + and max_download_speed = None + and max_upload_speed = None + and read_buffer_size = 1 lsl 14 + and read_queue_size = None + and write_queue_size = None + and incoming_app_message_queue_size = None + and incoming_message_queue_size = None + and outgoing_message_queue_size = None in let limits = - { max_message_size = 10_000 ; - peer_answer_timeout = 5. ; - expected_connections ; + { authentification_timeout ; min_connections ; + expected_connections ; max_connections ; - blacklist_time = 30. } + backlog ; + max_incoming_connections ; + max_download_speed ; + max_upload_speed ; + read_buffer_size ; + read_queue_size ; + write_queue_size ; + incoming_app_message_queue_size ; + incoming_message_queue_size ; + outgoing_message_queue_size ; + } in + (* TODO add parameters... *) + let identity = P2p.Identity.generate Crypto_box.default_target + and listening_addr = None + and proof_of_work_target = Crypto_box.default_target in let config = - { incoming_port = Some net_port ; - discovery_port = local_discovery ; - known_peers = peers ; + { listening_port = Some net_port ; + listening_addr ; + identity ; + trusted_points = peers ; peers_file = peers_cache ; - closed_network = closed } + closed_network = closed ; + proof_of_work_target ; + } in Some (config, limits) in Node.create @@ -458,7 +509,7 @@ let init_rpc { rpc_addr ; rpc_crt; rpc_key ; cors_origins ; cors_headers } node lwt_log_notice "Starting the RPC server listening on port %d (TLS enabled)." port >>= fun () -> let dir = Node_rpc.build_rpc_directory node in let mode = `TLS (`Crt_file_path crt, `Key_file_path key, `No_password, `Port port) in - let host = Ipaddr.to_string addr in + let host = Ipaddr.V6.to_string addr in let () = let old_hook = !Lwt.async_exception_hook in Lwt.async_exception_hook := function