Shell: limit known points table size

* Gc events still need to be recorded
This commit is contained in:
Vincent Bernardoff 2017-01-24 14:36:42 +01:00
parent 0a3ad7de53
commit efc6d285c6
12 changed files with 190 additions and 24 deletions

View File

@ -68,6 +68,10 @@ let default_net_limits : P2p.limits = {
incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
known_points_history_size = 500 ;
known_gids_history_size = 500 ;
max_known_points = Some (400, 300) ;
max_known_gids = Some (400, 300) ;
}
let default_net = {
@ -108,49 +112,68 @@ let limit : P2p.limits Data_encoding.t =
max_download_speed ; max_upload_speed ;
read_buffer_size ; read_queue_size ; write_queue_size ;
incoming_app_message_queue_size ;
incoming_message_queue_size ; outgoing_message_queue_size } ->
incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_gids_history_size ;
max_known_points ; max_known_gids ;
} ->
( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed) ,
( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size )))
incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_gids_history_size,
max_known_points, max_known_gids
)))
(fun ( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed) ,
( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size ) ) ->
incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_gids_history_size,
max_known_points, max_known_gids
) ) ->
{ authentification_timeout ; min_connections ; expected_connections ;
max_connections ; backlog ; max_incoming_connections ;
max_download_speed ; max_upload_speed ;
read_buffer_size ; read_queue_size ; write_queue_size ;
incoming_app_message_queue_size ;
incoming_message_queue_size ; outgoing_message_queue_size })
incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_gids_history_size ;
max_known_points ; max_known_gids
})
(merge_objs
(obj8
(dft "authentification-timeout"
float default_net_limits.authentification_timeout)
(dft "min-connections" int31
(dft "min-connections" uint16
default_net_limits.min_connections)
(dft "expected-connections" int31
(dft "expected-connections" uint16
default_net_limits.expected_connections)
(dft "max-connections" int31
(dft "max-connections" uint16
default_net_limits.max_connections)
(dft "backlog" int31
(dft "backlog" uint8
default_net_limits.backlog)
(dft "max-incoming-connections" int31
(dft "max-incoming-connections" uint8
default_net_limits.max_incoming_connections)
(opt "max-download-speed" int31)
(opt "max-upload-speed" int31))
(obj6
(obj10
(dft "read-buffer-size" int31
default_net_limits.read_buffer_size)
(opt "read-queue-size" int31)
(opt "write-queue-size" int31)
(opt "incoming-app-message-queue-size" int31)
(opt "incoming-message-queue-size" int31)
(opt "outgoing-message-queue-size" int31)))
(opt "outgoing-message-queue-size" int31)
(dft "known_points_history_size" uint16
default_net_limits.known_points_history_size)
(dft "known_gids_history_size" uint16
default_net_limits.known_points_history_size)
(opt "max_known_points" (tup2 uint16 uint16))
(opt "max_known_gids" (tup2 uint16 uint16))
))
let net =
let open Data_encoding in
@ -241,6 +264,7 @@ let update
?max_connections
?max_download_speed
?max_upload_speed
?peer_table_size
?expected_pow
?bootstrap_peers
?listen_addr
@ -251,6 +275,8 @@ let update
?rpc_tls
?log_output
cfg =
let peer_table_size =
map_option peer_table_size ~f:(fun i -> i, i / 4 * 3) in
let unopt_list ~default = function
| [] -> default
| l -> l in
@ -274,6 +300,12 @@ let update
max_upload_speed =
Utils.first_some
max_upload_speed cfg.net.limits.max_upload_speed ;
max_known_points =
Utils.first_some
peer_table_size cfg.net.limits.max_known_points ;
max_known_gids =
Utils.first_some
peer_table_size cfg.net.limits.max_known_gids ;
} in
let net : net = {
expected_pow =

View File

@ -56,6 +56,7 @@ val update:
?max_connections:int ->
?max_download_speed:int ->
?max_upload_speed:int ->
?peer_table_size:int ->
?expected_pow:float ->
?bootstrap_peers:string list ->
?listen_addr:string ->

View File

@ -20,6 +20,7 @@ type t = {
max_connections: int option ;
max_download_speed: int option ;
max_upload_speed: int option ;
peer_table_size: int option ;
expected_pow: float option ;
peers: string list ;
no_bootstrap_peers: bool ;
@ -35,6 +36,7 @@ type t = {
let wrap
data_dir config_file
connections max_download_speed max_upload_speed
peer_table_size
listen_addr peers no_bootstrap_peers closed expected_pow
rpc_listen_addr rpc_tls
cors_origins cors_headers log_output =
@ -74,6 +76,7 @@ let wrap
cors_headers ;
rpc_tls ;
log_output ;
peer_table_size ;
}
module Manpage = struct
@ -147,6 +150,13 @@ module Term = struct
Arg.(value & opt (some int) None &
info ~docs ~doc ~docv:"NUM" ["max-upload-speed"])
let peer_table_size =
let doc = "Maximum size of internal peer tables, \
used to store metadata/logs about a peer or about a \
to-be-authenticated host:port couple." in
Arg.(value & opt (some int) None &
info ~docs ~doc ~docv:"NUM" ["peer-table-size"])
let listen_addr =
let doc =
"The TCP address and port at which this instance can be reached." in
@ -214,6 +224,7 @@ module Term = struct
const wrap $ data_dir $ config_file
$ connections
$ max_download_speed $ max_upload_speed
$ peer_table_size
$ listen_addr $ peers $ no_bootstrap_peers $ closed $ expected_pow
$ rpc_listen_addr $ rpc_tls
$ cors_origins $ cors_headers
@ -231,6 +242,7 @@ let read_and_patch_config_file args =
let { data_dir ;
min_connections ; expected_connections ; max_connections ;
max_download_speed ; max_upload_speed ;
peer_table_size ;
expected_pow ;
peers ; no_bootstrap_peers ;
listen_addr ; closed ;
@ -245,6 +257,6 @@ let read_and_patch_config_file args =
return @@
Node_config_file.update
?data_dir ?min_connections ?expected_connections ?max_connections
?max_download_speed ?max_upload_speed ?expected_pow
?max_download_speed ?max_upload_speed ?peer_table_size ?expected_pow
~bootstrap_peers ?listen_addr ?rpc_listen_addr
~closed ~cors_origins ~cors_headers ?rpc_tls ?log_output cfg

View File

@ -17,6 +17,7 @@ type t = {
max_connections: int option ;
max_download_speed: int option ;
max_upload_speed: int option ;
peer_table_size: int option ;
expected_pow: float option ;
peers: string list ;
no_bootstrap_peers: bool ;

View File

@ -14,6 +14,7 @@ include Logging.Make(struct let name = "p2p" end)
type 'meta meta_config = 'meta P2p_connection_pool.meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float
}
type 'msg app_message_encoding = 'msg P2p_connection_pool.encoding =
@ -61,6 +62,10 @@ type limits = {
incoming_message_queue_size : int option ;
outgoing_message_queue_size : int option ;
known_gids_history_size : int ;
known_points_history_size : int ;
max_known_gids : (int * int) option ;
max_known_points : (int * int) option ;
}
let create_scheduler limits =
@ -91,6 +96,10 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched =
incoming_app_message_queue_size = limits.incoming_app_message_queue_size ;
incoming_message_queue_size = limits.incoming_message_queue_size ;
outgoing_message_queue_size = limits.outgoing_message_queue_size ;
known_gids_history_size = limits.known_gids_history_size ;
known_points_history_size = limits.known_points_history_size ;
max_known_points = limits.max_known_points ;
max_known_gids = limits.max_known_gids ;
}
in
let pool =

View File

@ -32,6 +32,7 @@ module Stat = P2p_types.Stat
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float
}
type 'msg app_message_encoding = Encoding : {
@ -114,6 +115,13 @@ type limits = {
outgoing_message_queue_size : int option ;
(** Various bounds for internal queues. *)
known_gids_history_size : int ;
known_points_history_size : int ;
(** Size of circular log buffers, in number of events recorded. *)
max_known_gids : (int * int) option ;
max_known_points : (int * int) option ;
(** Optional limitation of internal hashtables (max, target) *)
}
type ('msg, 'meta) t

View File

@ -144,11 +144,16 @@ type config = {
incoming_message_queue_size : int option ;
outgoing_message_queue_size : int option ;
known_gids_history_size : int ;
known_points_history_size : int ;
max_known_points : (int * int) option ; (* max, gc target *)
max_known_gids : (int * int) option ; (* max, gc target *)
}
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float;
}
type 'msg message_config = {
@ -190,19 +195,81 @@ and ('msg, 'meta) connection = {
type ('msg, 'meta) pool = ('msg, 'meta) t
module GcPointSet = Utils.Bounded(struct
type t = Time.t * Point.t
let compare (x, _) (y, _) = - (Time.compare x y)
end)
let gc_points { config = { max_known_points } ; known_points } =
match max_known_points with
| None -> ()
| Some (_, target) ->
let now = Time.now () in (* TODO: maybe time of discovery? *)
let table = GcPointSet.create target in
Point.Table.iter (fun p pi ->
if Point_info.State.is_disconnected pi then
let time =
match Point_info.last_miss pi with
| None -> now
| Some t -> t in
GcPointSet.insert (time, p) table
) known_points ;
let to_remove = GcPointSet.get table in
ListLabels.iter to_remove ~f:begin fun (_, p) ->
Point.Table.remove known_points p
end
let register_point pool ?trusted (addr, port as point) =
match Point.Table.find pool.known_points point with
| exception Not_found ->
let pi = Point_info.create ?trusted addr port in
iter_option pool.config.max_known_points ~f:begin fun (max, _) ->
if Point.Table.length pool.known_points >= max then gc_points pool
end ;
Point.Table.add pool.known_points point pi ;
pi
| pi -> pi
(* Bounded table used to garbage collect gid infos when needed. The
strategy used is to remove the info of the gid with the lowest
score first. In case of equality, the info of the most recent added
gid is removed. The rationale behind this choice is that in the
case of a flood attack, the newly added infos will probably belong
to gids with the same (low) score and removing the most recent ones
ensure that older (and probably legit) gid infos are kept. *)
module GcGidSet = Utils.Bounded(struct
type t = float * Time.t * Gid.t
let compare (s, t, _) (s', t', _) =
let score_cmp = Pervasives.compare s s' in
if score_cmp = 0 then Time.compare t t' else - score_cmp
end)
let gc_gids { meta_config = { score } ;
config = { max_known_gids } ;
known_gids ; } =
match max_known_gids with
| None -> ()
| Some (_, target) ->
let table = GcGidSet.create target in
Gid.Table.iter (fun gid gid_info ->
let created = Gid_info.created gid_info in
let score = score @@ Gid_info.metadata gid_info in
GcGidSet.insert (score, created, gid) table
) known_gids ;
let to_remove = GcGidSet.get table in
ListLabels.iter to_remove ~f:begin fun (_, _, gid) ->
Gid.Table.remove known_gids gid
end
let register_peer pool gid =
match Gid.Table.find pool.known_gids gid with
| exception Not_found ->
Lwt_condition.broadcast pool.events.new_point () ;
let peer = Gid_info.create gid ~metadata:pool.meta_config.initial in
iter_option pool.config.max_known_gids ~f:begin fun (max, _) ->
if Gid.Table.length pool.known_gids >= max then gc_gids pool
end ;
Gid.Table.add pool.known_gids gid peer ;
peer
| peer -> peer

View File

@ -92,11 +92,28 @@ type config = {
outgoing_message_queue_size : int option ;
(** Size of the outgoing message queue internal to a peer's Writer
(See [P2p_connection.accept]). *)
known_gids_history_size : int ;
(** Size of the known gids log buffer (default: 50) *)
known_points_history_size : int ;
(** Size of the known points log buffer (default: 50) *)
max_known_points : (int * int) option ;
(** Parameters for the the garbage collection of known points. If
None, no garbage collection is performed. Otherwise, the first
integer of the couple limits the size of the "known points"
table. When this number is reached, the table is expurged from
disconnected points, older first, to try to reach the amount of
connections indicated by the second integer. *)
max_known_gids : (int * int) option ;
(** Like [max_known_points], but for known gids. *)
}
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float;
}
type 'msg message_config = {

View File

@ -283,14 +283,15 @@ module Gid_info = struct
type ('conn, 'meta) t = {
gid : Gid.t ;
created : Time.t ;
mutable state : 'conn state ;
mutable metadata : 'meta ;
mutable trusted : bool ;
events : Event.t Ring.t ;
mutable last_failed_connection : (Id_point.t * Time.t) option ;
mutable last_rejected_connection : (Id_point.t * Time.t) option ;
mutable last_established_connection : (Id_point.t * Time.t) option ;
mutable last_disconnection : (Id_point.t * Time.t) option ;
events : Event.t Ring.t ;
}
type ('conn, 'meta) gid_info = ('conn, 'meta) t
@ -298,42 +299,44 @@ module Gid_info = struct
let log_size = 100
let create ?(trusted = false) ~metadata gid =
let create ?(created = Time.now ()) ?(trusted = false) ~metadata gid =
{ gid ;
created ;
state = Disconnected ;
metadata ;
trusted ;
events = Ring.create log_size ;
last_failed_connection = None ;
last_rejected_connection = None ;
last_established_connection = None ;
last_disconnection = None ;
events = Ring.create log_size ;
}
let encoding metadata_encoding =
let open Data_encoding in
conv
(fun { gid ; trusted ; metadata ; events ;
(fun { gid ; trusted ; metadata ; events ; created ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection } ->
(gid, trusted, metadata, Ring.elements events,
(gid, created, trusted, metadata, Ring.elements events,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection))
(fun (gid, trusted, metadata, event_list,
(fun (gid, created, trusted, metadata, event_list,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection) ->
let info = create ~trusted ~metadata gid in
let events = Ring.create log_size in
Ring.add_list info.events event_list ;
{ state = Disconnected ;
trusted ; gid ; metadata ; events ;
{ gid ; created ; state = Disconnected ;
trusted ; metadata ; events ;
last_failed_connection ;
last_rejected_connection ;
last_established_connection ;
last_disconnection ;
})
(obj8
(obj9
(req "gid" Gid.encoding)
(req "created" Time.encoding)
(dft "trusted" bool false)
(req "metadata" metadata_encoding)
(dft "events" (list Event.encoding) [])
@ -347,6 +350,7 @@ module Gid_info = struct
(tup2 Id_point.encoding Time.encoding)))
let gid { gid } = gid
let created { created } = created
let metadata { metadata } = metadata
let set_metadata gi metadata = gi.metadata <- metadata
let trusted { trusted } = trusted

View File

@ -58,6 +58,12 @@ module Point_info : sig
val last_miss :
'conn point_info -> Time.t option
(** [last_miss pi] is the most recent of:
* last failed connection
* last rejected connection
* last disconnection
*)
val greylisted :
?now:Time.t -> 'conn point_info -> bool
@ -145,6 +151,7 @@ module Gid_info : sig
val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int
val create :
?created:Time.t ->
?trusted:bool ->
metadata:'meta ->
Gid.t -> ('conn, 'meta) gid_info
@ -153,6 +160,7 @@ module Gid_info : sig
val gid : ('conn, 'meta) gid_info -> Gid.t
val created : ('conn, 'meta) gid_info -> Time.t
val metadata : ('conn, 'meta) gid_info -> 'meta
val set_metadata : ('conn, 'meta) gid_info -> 'meta -> unit

View File

@ -97,6 +97,7 @@ end
let meta_cfg : _ P2p.meta_config = {
P2p.encoding = Metadata.encoding ;
initial = Metadata.initial ;
score = Metadata.score
}
and msg_cfg : _ P2p.message_config = {

View File

@ -33,6 +33,7 @@ type metadata = unit
let meta_config : metadata P2p_connection_pool.meta_config = {
encoding = Data_encoding.empty ;
initial = () ;
score = fun () -> 0. ;
}
let rec connect ~timeout pool point =
@ -128,6 +129,7 @@ let make_net points repeat n =
let point, points = Utils.select n points in
let proof_of_work_target = Crypto_box.make_target 0. in
let identity = Identity.generate proof_of_work_target in
let nb_points = List.length points in
let config = P2p_connection_pool.{
identity ;
proof_of_work_target ;
@ -135,13 +137,17 @@ let make_net points repeat n =
peers_file = "/dev/null" ;
closed_network = true ;
listening_port = Some (snd point) ;
min_connections = List.length points ;
max_connections = List.length points ;
max_incoming_connections = List.length points ;
min_connections = nb_points ;
max_connections = nb_points ;
max_incoming_connections = nb_points ;
authentification_timeout = 2. ;
incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
known_gids_history_size = 100 ;
known_points_history_size = 100 ;
max_known_points = None ;
max_known_gids = None ;
} in
Process.detach
~prefix:(Format.asprintf "%a " Gid.pp identity.gid)