diff --git a/src/node/main/node_config_file.ml b/src/node/main/node_config_file.ml index 07ea8695c..8fd93c050 100644 --- a/src/node/main/node_config_file.ml +++ b/src/node/main/node_config_file.ml @@ -68,6 +68,10 @@ let default_net_limits : P2p.limits = { incoming_app_message_queue_size = None ; incoming_message_queue_size = None ; outgoing_message_queue_size = None ; + known_points_history_size = 500 ; + known_gids_history_size = 500 ; + max_known_points = Some (400, 300) ; + max_known_gids = Some (400, 300) ; } let default_net = { @@ -108,49 +112,68 @@ let limit : P2p.limits Data_encoding.t = max_download_speed ; max_upload_speed ; read_buffer_size ; read_queue_size ; write_queue_size ; incoming_app_message_queue_size ; - incoming_message_queue_size ; outgoing_message_queue_size } -> + incoming_message_queue_size ; outgoing_message_queue_size ; + known_points_history_size ; known_gids_history_size ; + max_known_points ; max_known_gids ; + } -> ( ( authentification_timeout, min_connections, expected_connections, max_connections, backlog, max_incoming_connections, max_download_speed, max_upload_speed) , ( read_buffer_size, read_queue_size, write_queue_size, incoming_app_message_queue_size, - incoming_message_queue_size, outgoing_message_queue_size ))) + incoming_message_queue_size, outgoing_message_queue_size, + known_points_history_size, known_gids_history_size, + max_known_points, max_known_gids + ))) (fun ( ( authentification_timeout, min_connections, expected_connections, max_connections, backlog, max_incoming_connections, max_download_speed, max_upload_speed) , ( read_buffer_size, read_queue_size, write_queue_size, incoming_app_message_queue_size, - incoming_message_queue_size, outgoing_message_queue_size ) ) -> + incoming_message_queue_size, outgoing_message_queue_size, + known_points_history_size, known_gids_history_size, + max_known_points, max_known_gids + ) ) -> { authentification_timeout ; min_connections ; expected_connections ; max_connections ; backlog ; max_incoming_connections ; max_download_speed ; max_upload_speed ; read_buffer_size ; read_queue_size ; write_queue_size ; incoming_app_message_queue_size ; - incoming_message_queue_size ; outgoing_message_queue_size }) + incoming_message_queue_size ; outgoing_message_queue_size ; + known_points_history_size ; known_gids_history_size ; + max_known_points ; max_known_gids + }) (merge_objs (obj8 (dft "authentification-timeout" float default_net_limits.authentification_timeout) - (dft "min-connections" int31 + (dft "min-connections" uint16 default_net_limits.min_connections) - (dft "expected-connections" int31 + (dft "expected-connections" uint16 default_net_limits.expected_connections) - (dft "max-connections" int31 + (dft "max-connections" uint16 default_net_limits.max_connections) - (dft "backlog" int31 + (dft "backlog" uint8 default_net_limits.backlog) - (dft "max-incoming-connections" int31 + (dft "max-incoming-connections" uint8 default_net_limits.max_incoming_connections) (opt "max-download-speed" int31) (opt "max-upload-speed" int31)) - (obj6 + (obj10 (dft "read-buffer-size" int31 default_net_limits.read_buffer_size) (opt "read-queue-size" int31) (opt "write-queue-size" int31) (opt "incoming-app-message-queue-size" int31) (opt "incoming-message-queue-size" int31) - (opt "outgoing-message-queue-size" int31))) + (opt "outgoing-message-queue-size" int31) + (dft "known_points_history_size" uint16 + default_net_limits.known_points_history_size) + (dft "known_gids_history_size" uint16 + default_net_limits.known_points_history_size) + (opt "max_known_points" (tup2 uint16 uint16)) + (opt "max_known_gids" (tup2 uint16 uint16)) + )) let net = let open Data_encoding in @@ -241,6 +264,7 @@ let update ?max_connections ?max_download_speed ?max_upload_speed + ?peer_table_size ?expected_pow ?bootstrap_peers ?listen_addr @@ -251,6 +275,8 @@ let update ?rpc_tls ?log_output cfg = + let peer_table_size = + map_option peer_table_size ~f:(fun i -> i, i / 4 * 3) in let unopt_list ~default = function | [] -> default | l -> l in @@ -274,6 +300,12 @@ let update max_upload_speed = Utils.first_some max_upload_speed cfg.net.limits.max_upload_speed ; + max_known_points = + Utils.first_some + peer_table_size cfg.net.limits.max_known_points ; + max_known_gids = + Utils.first_some + peer_table_size cfg.net.limits.max_known_gids ; } in let net : net = { expected_pow = diff --git a/src/node/main/node_config_file.mli b/src/node/main/node_config_file.mli index db5d377a0..487e3ffcc 100644 --- a/src/node/main/node_config_file.mli +++ b/src/node/main/node_config_file.mli @@ -56,6 +56,7 @@ val update: ?max_connections:int -> ?max_download_speed:int -> ?max_upload_speed:int -> + ?peer_table_size:int -> ?expected_pow:float -> ?bootstrap_peers:string list -> ?listen_addr:string -> diff --git a/src/node/main/node_shared_arg.ml b/src/node/main/node_shared_arg.ml index b94de88da..99e5d1fb3 100644 --- a/src/node/main/node_shared_arg.ml +++ b/src/node/main/node_shared_arg.ml @@ -20,6 +20,7 @@ type t = { max_connections: int option ; max_download_speed: int option ; max_upload_speed: int option ; + peer_table_size: int option ; expected_pow: float option ; peers: string list ; no_bootstrap_peers: bool ; @@ -35,6 +36,7 @@ type t = { let wrap data_dir config_file connections max_download_speed max_upload_speed + peer_table_size listen_addr peers no_bootstrap_peers closed expected_pow rpc_listen_addr rpc_tls cors_origins cors_headers log_output = @@ -74,6 +76,7 @@ let wrap cors_headers ; rpc_tls ; log_output ; + peer_table_size ; } module Manpage = struct @@ -147,6 +150,13 @@ module Term = struct Arg.(value & opt (some int) None & info ~docs ~doc ~docv:"NUM" ["max-upload-speed"]) + let peer_table_size = + let doc = "Maximum size of internal peer tables, \ + used to store metadata/logs about a peer or about a \ + to-be-authenticated host:port couple." in + Arg.(value & opt (some int) None & + info ~docs ~doc ~docv:"NUM" ["peer-table-size"]) + let listen_addr = let doc = "The TCP address and port at which this instance can be reached." in @@ -214,6 +224,7 @@ module Term = struct const wrap $ data_dir $ config_file $ connections $ max_download_speed $ max_upload_speed + $ peer_table_size $ listen_addr $ peers $ no_bootstrap_peers $ closed $ expected_pow $ rpc_listen_addr $ rpc_tls $ cors_origins $ cors_headers @@ -231,6 +242,7 @@ let read_and_patch_config_file args = let { data_dir ; min_connections ; expected_connections ; max_connections ; max_download_speed ; max_upload_speed ; + peer_table_size ; expected_pow ; peers ; no_bootstrap_peers ; listen_addr ; closed ; @@ -245,6 +257,6 @@ let read_and_patch_config_file args = return @@ Node_config_file.update ?data_dir ?min_connections ?expected_connections ?max_connections - ?max_download_speed ?max_upload_speed ?expected_pow + ?max_download_speed ?max_upload_speed ?peer_table_size ?expected_pow ~bootstrap_peers ?listen_addr ?rpc_listen_addr ~closed ~cors_origins ~cors_headers ?rpc_tls ?log_output cfg diff --git a/src/node/main/node_shared_arg.mli b/src/node/main/node_shared_arg.mli index dee36d295..574612dc6 100644 --- a/src/node/main/node_shared_arg.mli +++ b/src/node/main/node_shared_arg.mli @@ -17,6 +17,7 @@ type t = { max_connections: int option ; max_download_speed: int option ; max_upload_speed: int option ; + peer_table_size: int option ; expected_pow: float option ; peers: string list ; no_bootstrap_peers: bool ; diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 49451ca57..053aea871 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -14,6 +14,7 @@ include Logging.Make(struct let name = "p2p" end) type 'meta meta_config = 'meta P2p_connection_pool.meta_config = { encoding : 'meta Data_encoding.t; initial : 'meta; + score : 'meta -> float } type 'msg app_message_encoding = 'msg P2p_connection_pool.encoding = @@ -61,6 +62,10 @@ type limits = { incoming_message_queue_size : int option ; outgoing_message_queue_size : int option ; + known_gids_history_size : int ; + known_points_history_size : int ; + max_known_gids : (int * int) option ; + max_known_points : (int * int) option ; } let create_scheduler limits = @@ -91,6 +96,10 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched = incoming_app_message_queue_size = limits.incoming_app_message_queue_size ; incoming_message_queue_size = limits.incoming_message_queue_size ; outgoing_message_queue_size = limits.outgoing_message_queue_size ; + known_gids_history_size = limits.known_gids_history_size ; + known_points_history_size = limits.known_points_history_size ; + max_known_points = limits.max_known_points ; + max_known_gids = limits.max_known_gids ; } in let pool = diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index d2cc1c64a..24967335d 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -32,6 +32,7 @@ module Stat = P2p_types.Stat type 'meta meta_config = { encoding : 'meta Data_encoding.t; initial : 'meta; + score : 'meta -> float } type 'msg app_message_encoding = Encoding : { @@ -114,6 +115,13 @@ type limits = { outgoing_message_queue_size : int option ; (** Various bounds for internal queues. *) + known_gids_history_size : int ; + known_points_history_size : int ; + (** Size of circular log buffers, in number of events recorded. *) + + max_known_gids : (int * int) option ; + max_known_points : (int * int) option ; + (** Optional limitation of internal hashtables (max, target) *) } type ('msg, 'meta) t diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 68e4727a3..408051a9d 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -144,11 +144,16 @@ type config = { incoming_message_queue_size : int option ; outgoing_message_queue_size : int option ; + known_gids_history_size : int ; + known_points_history_size : int ; + max_known_points : (int * int) option ; (* max, gc target *) + max_known_gids : (int * int) option ; (* max, gc target *) } type 'meta meta_config = { encoding : 'meta Data_encoding.t; initial : 'meta; + score : 'meta -> float; } type 'msg message_config = { @@ -190,19 +195,81 @@ and ('msg, 'meta) connection = { type ('msg, 'meta) pool = ('msg, 'meta) t +module GcPointSet = Utils.Bounded(struct + type t = Time.t * Point.t + let compare (x, _) (y, _) = - (Time.compare x y) + end) + +let gc_points { config = { max_known_points } ; known_points } = + match max_known_points with + | None -> () + | Some (_, target) -> + let now = Time.now () in (* TODO: maybe time of discovery? *) + let table = GcPointSet.create target in + Point.Table.iter (fun p pi -> + if Point_info.State.is_disconnected pi then + let time = + match Point_info.last_miss pi with + | None -> now + | Some t -> t in + GcPointSet.insert (time, p) table + ) known_points ; + let to_remove = GcPointSet.get table in + ListLabels.iter to_remove ~f:begin fun (_, p) -> + Point.Table.remove known_points p + end + let register_point pool ?trusted (addr, port as point) = match Point.Table.find pool.known_points point with | exception Not_found -> let pi = Point_info.create ?trusted addr port in + iter_option pool.config.max_known_points ~f:begin fun (max, _) -> + if Point.Table.length pool.known_points >= max then gc_points pool + end ; Point.Table.add pool.known_points point pi ; pi | pi -> pi + +(* Bounded table used to garbage collect gid infos when needed. The + strategy used is to remove the info of the gid with the lowest + score first. In case of equality, the info of the most recent added + gid is removed. The rationale behind this choice is that in the + case of a flood attack, the newly added infos will probably belong + to gids with the same (low) score and removing the most recent ones + ensure that older (and probably legit) gid infos are kept. *) +module GcGidSet = Utils.Bounded(struct + type t = float * Time.t * Gid.t + let compare (s, t, _) (s', t', _) = + let score_cmp = Pervasives.compare s s' in + if score_cmp = 0 then Time.compare t t' else - score_cmp + end) + +let gc_gids { meta_config = { score } ; + config = { max_known_gids } ; + known_gids ; } = + match max_known_gids with + | None -> () + | Some (_, target) -> + let table = GcGidSet.create target in + Gid.Table.iter (fun gid gid_info -> + let created = Gid_info.created gid_info in + let score = score @@ Gid_info.metadata gid_info in + GcGidSet.insert (score, created, gid) table + ) known_gids ; + let to_remove = GcGidSet.get table in + ListLabels.iter to_remove ~f:begin fun (_, _, gid) -> + Gid.Table.remove known_gids gid + end + let register_peer pool gid = match Gid.Table.find pool.known_gids gid with | exception Not_found -> Lwt_condition.broadcast pool.events.new_point () ; let peer = Gid_info.create gid ~metadata:pool.meta_config.initial in + iter_option pool.config.max_known_gids ~f:begin fun (max, _) -> + if Gid.Table.length pool.known_gids >= max then gc_gids pool + end ; Gid.Table.add pool.known_gids gid peer ; peer | peer -> peer diff --git a/src/node/net/p2p_connection_pool.mli b/src/node/net/p2p_connection_pool.mli index 7eaf08445..62f4e67a2 100644 --- a/src/node/net/p2p_connection_pool.mli +++ b/src/node/net/p2p_connection_pool.mli @@ -92,11 +92,28 @@ type config = { outgoing_message_queue_size : int option ; (** Size of the outgoing message queue internal to a peer's Writer (See [P2p_connection.accept]). *) + + known_gids_history_size : int ; + (** Size of the known gids log buffer (default: 50) *) + known_points_history_size : int ; + (** Size of the known points log buffer (default: 50) *) + + max_known_points : (int * int) option ; + (** Parameters for the the garbage collection of known points. If + None, no garbage collection is performed. Otherwise, the first + integer of the couple limits the size of the "known points" + table. When this number is reached, the table is expurged from + disconnected points, older first, to try to reach the amount of + connections indicated by the second integer. *) + + max_known_gids : (int * int) option ; + (** Like [max_known_points], but for known gids. *) } type 'meta meta_config = { encoding : 'meta Data_encoding.t; initial : 'meta; + score : 'meta -> float; } type 'msg message_config = { diff --git a/src/node/net/p2p_connection_pool_types.ml b/src/node/net/p2p_connection_pool_types.ml index dabbc5a96..b550d4f87 100644 --- a/src/node/net/p2p_connection_pool_types.ml +++ b/src/node/net/p2p_connection_pool_types.ml @@ -283,14 +283,15 @@ module Gid_info = struct type ('conn, 'meta) t = { gid : Gid.t ; + created : Time.t ; mutable state : 'conn state ; mutable metadata : 'meta ; mutable trusted : bool ; + events : Event.t Ring.t ; mutable last_failed_connection : (Id_point.t * Time.t) option ; mutable last_rejected_connection : (Id_point.t * Time.t) option ; mutable last_established_connection : (Id_point.t * Time.t) option ; mutable last_disconnection : (Id_point.t * Time.t) option ; - events : Event.t Ring.t ; } type ('conn, 'meta) gid_info = ('conn, 'meta) t @@ -298,42 +299,44 @@ module Gid_info = struct let log_size = 100 - let create ?(trusted = false) ~metadata gid = + let create ?(created = Time.now ()) ?(trusted = false) ~metadata gid = { gid ; + created ; state = Disconnected ; metadata ; trusted ; - events = Ring.create log_size ; last_failed_connection = None ; last_rejected_connection = None ; last_established_connection = None ; last_disconnection = None ; + events = Ring.create log_size ; } let encoding metadata_encoding = let open Data_encoding in conv - (fun { gid ; trusted ; metadata ; events ; + (fun { gid ; trusted ; metadata ; events ; created ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection } -> - (gid, trusted, metadata, Ring.elements events, + (gid, created, trusted, metadata, Ring.elements events, last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection)) - (fun (gid, trusted, metadata, event_list, + (fun (gid, created, trusted, metadata, event_list, last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection) -> let info = create ~trusted ~metadata gid in let events = Ring.create log_size in Ring.add_list info.events event_list ; - { state = Disconnected ; - trusted ; gid ; metadata ; events ; + { gid ; created ; state = Disconnected ; + trusted ; metadata ; events ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; }) - (obj8 + (obj9 (req "gid" Gid.encoding) + (req "created" Time.encoding) (dft "trusted" bool false) (req "metadata" metadata_encoding) (dft "events" (list Event.encoding) []) @@ -347,6 +350,7 @@ module Gid_info = struct (tup2 Id_point.encoding Time.encoding))) let gid { gid } = gid + let created { created } = created let metadata { metadata } = metadata let set_metadata gi metadata = gi.metadata <- metadata let trusted { trusted } = trusted diff --git a/src/node/net/p2p_connection_pool_types.mli b/src/node/net/p2p_connection_pool_types.mli index be56dcd56..2773c7743 100644 --- a/src/node/net/p2p_connection_pool_types.mli +++ b/src/node/net/p2p_connection_pool_types.mli @@ -58,6 +58,12 @@ module Point_info : sig val last_miss : 'conn point_info -> Time.t option + (** [last_miss pi] is the most recent of: + + * last failed connection + * last rejected connection + * last disconnection + *) val greylisted : ?now:Time.t -> 'conn point_info -> bool @@ -145,6 +151,7 @@ module Gid_info : sig val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int val create : + ?created:Time.t -> ?trusted:bool -> metadata:'meta -> Gid.t -> ('conn, 'meta) gid_info @@ -153,6 +160,7 @@ module Gid_info : sig val gid : ('conn, 'meta) gid_info -> Gid.t + val created : ('conn, 'meta) gid_info -> Time.t val metadata : ('conn, 'meta) gid_info -> 'meta val set_metadata : ('conn, 'meta) gid_info -> 'meta -> unit diff --git a/src/node/shell/tezos_p2p.ml b/src/node/shell/tezos_p2p.ml index 26ffe1d3a..a0c55cf7d 100644 --- a/src/node/shell/tezos_p2p.ml +++ b/src/node/shell/tezos_p2p.ml @@ -97,6 +97,7 @@ end let meta_cfg : _ P2p.meta_config = { P2p.encoding = Metadata.encoding ; initial = Metadata.initial ; + score = Metadata.score } and msg_cfg : _ P2p.message_config = { diff --git a/test/test_p2p_connection_pool.ml b/test/test_p2p_connection_pool.ml index 4663e9a5e..5595a2896 100644 --- a/test/test_p2p_connection_pool.ml +++ b/test/test_p2p_connection_pool.ml @@ -33,6 +33,7 @@ type metadata = unit let meta_config : metadata P2p_connection_pool.meta_config = { encoding = Data_encoding.empty ; initial = () ; + score = fun () -> 0. ; } let rec connect ~timeout pool point = @@ -128,6 +129,7 @@ let make_net points repeat n = let point, points = Utils.select n points in let proof_of_work_target = Crypto_box.make_target 0. in let identity = Identity.generate proof_of_work_target in + let nb_points = List.length points in let config = P2p_connection_pool.{ identity ; proof_of_work_target ; @@ -135,13 +137,17 @@ let make_net points repeat n = peers_file = "/dev/null" ; closed_network = true ; listening_port = Some (snd point) ; - min_connections = List.length points ; - max_connections = List.length points ; - max_incoming_connections = List.length points ; + min_connections = nb_points ; + max_connections = nb_points ; + max_incoming_connections = nb_points ; authentification_timeout = 2. ; incoming_app_message_queue_size = None ; incoming_message_queue_size = None ; outgoing_message_queue_size = None ; + known_gids_history_size = 100 ; + known_points_history_size = 100 ; + max_known_points = None ; + max_known_gids = None ; } in Process.detach ~prefix:(Format.asprintf "%a " Gid.pp identity.gid)