From 02838a4cbce68fe946958aee5d7ab957cdf0cf57 Mon Sep 17 00:00:00 2001 From: Pietro Date: Thu, 22 Feb 2018 15:35:50 +0100 Subject: [PATCH] p2p,node,client: Add Greylists - add admin commands to ban and unban ips and peers - add greylist_timeout option to configuration file (node) - Add greylist modules + RPC --- .gitlab-ci.yml | 15 ++ docs/whitedoc/p2p.rst | 31 ++- src/bin_client/main_client.ml | 1 - src/bin_client/test/test_multinode.sh | 2 +- src/bin_node/node_config_file.ml | 28 ++- src/lib_base/p2p_addr.ml | 20 ++ src/lib_base/p2p_addr.mli | 7 + src/lib_base/time.mli | 2 + .../client_p2p_commands.ml | 124 +++++++++- src/lib_p2p/p2p.ml | 93 +++++++- src/lib_p2p/p2p.mli | 6 + src/lib_p2p/p2p_acl.ml | 223 ++++++++++++++++++ src/lib_p2p/p2p_acl.mli | 100 ++++++++ src/lib_p2p/p2p_maintenance.ml | 14 +- src/lib_p2p/p2p_maintenance.mli | 6 +- src/lib_p2p/p2p_pool.ml | 143 ++++++++--- src/lib_p2p/p2p_pool.mli | 19 ++ src/lib_p2p/test/jbuild | 30 ++- src/lib_p2p/test/test_p2p_banned_peers.ml | 66 ++++++ src/lib_p2p/test/test_p2p_ipv6set.ml | 139 +++++++++++ src/lib_p2p/test/test_p2p_peerset.ml | 57 +++++ src/lib_p2p/test/test_p2p_pool.ml | 1 + src/lib_shell/distributed_db.ml | 15 +- src/lib_shell/distributed_db.mli | 3 + src/lib_shell/peer_validator.ml | 4 +- src/lib_shell_services/p2p_errors.ml | 30 ++- src/lib_shell_services/p2p_services.ml | 120 ++++++++-- src/lib_shell_services/p2p_services.mli | 72 ++++++ src/lib_stdlib/ring.ml | 177 +++++++++----- src/lib_stdlib/ring.mli | 32 ++- 30 files changed, 1441 insertions(+), 139 deletions(-) create mode 100644 src/lib_p2p/p2p_acl.ml create mode 100644 src/lib_p2p/p2p_acl.mli create mode 100644 src/lib_p2p/test/test_p2p_banned_peers.ml create mode 100644 src/lib_p2p/test/test_p2p_ipv6set.ml create mode 100644 src/lib_p2p/test/test_p2p_peerset.ml diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c8abc2b82..f85936c09 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -161,6 +161,21 @@ test:proto_alpha: script: - jbuilder build @src/proto_alpha/lib_protocol/runtest +test:p2p:peerset: + <<: *test_definition + script: + - jbuilder build @src/lib_p2p/runtest_p2p_peerset + +test:p2p:ipv6set: + <<: *test_definition + script: + - jbuilder build @src/lib_p2p/runtest_p2p_ipv6set + +test:p2p:banned_peers: + <<: *test_definition + script: + - jbuilder build @src/lib_p2p/runtest_p2p_banned_peers + # test:client_alpha:transaction: # <<: *test_definition # script: diff --git a/docs/whitedoc/p2p.rst b/docs/whitedoc/p2p.rst index 565433cc0..77bffaeeb 100644 --- a/docs/whitedoc/p2p.rst +++ b/docs/whitedoc/p2p.rst @@ -28,12 +28,15 @@ General operation I/O Scheduling ~~~~~~~~~~~~~~ -The P2P layer uses I/O scheduling in order to be able to control its -bandwidth usage as well as implementing different policies -(e.g. read/write quotas) to different peers. For now, each peer is -granted a fair share of the global allocated bandwidth, but it is -planned for the individual allocated bandwidth to each peer to be a -function of the peer's score. +The P2P layer uses a scheduling mechanism in order to be able to control its +bandwidth usage as well as implementing different policies (e.g. read/write +quotas) to different peers. For now, each peer is granted a fair share of the +global allocated bandwidth, but it is planned for the individual allocated +bandwidth to each peer to be a function of the peer's score. Each connection is +associated tp a read/write queues where data is copied at a rate of equivalent +to max_download_speed / num_connections (resp. max_upload_speed / +num_connections). + Encryption ~~~~~~~~~~ @@ -83,6 +86,19 @@ layer. It basically runs the ``accept(2)`` syscall and call that it is made aware of an incoming connection. From there, the pool will decide how this new connection must be handled. +BlackLists, WhiteLists, GreyLists +-------------------------------- + +The welcome worker takes care of filtering all incoming connections using two +static lists of address provided by the admin (either via the ``tezos-admin`` +client or directly in the configuration file) and an automatic (grey)list +handled automatically by the p2p layer. The node admin can block or whitelist +individual ip addresses, while the p2p layer can temporarly ban ip addresses and +peer's identities that misbihaved. The delay to remove an ip address from +the greylist table is defined by the varaible ``greylist_timeout``, while +identities are greylisted in a fixed size ring and periodically removed. The +node admin can also flush the greylist tables using the ``tezos-admin`` client. + Maintenance worker ------------------ @@ -111,4 +127,5 @@ Given these bounds, the maintenance worker: peers until it reaches at least ``min_target`` connections (and never more than ``max_target`` connections). - +The maintenance worker is also in charge to periodically run the greylists +GC functions to unban ip addresses from the greylist. diff --git a/src/bin_client/main_client.ml b/src/bin_client/main_client.ml index 65605461d..c596ca115 100644 --- a/src/bin_client/main_client.ml +++ b/src/bin_client/main_client.ml @@ -42,7 +42,6 @@ let get_commands_for_version ctxt block protocol = let select_commands ctxt { block ; protocol } = get_commands_for_version ctxt block protocol >>|? fun (_, commands_for_version) -> Client_rpc_commands.commands @ - Client_p2p_commands.commands () @ Client_keys_commands.commands () @ Client_helpers_commands.commands () @ commands_for_version diff --git a/src/bin_client/test/test_multinode.sh b/src/bin_client/test/test_multinode.sh index f213229cc..2720c68a8 100755 --- a/src/bin_client/test/test_multinode.sh +++ b/src/bin_client/test/test_multinode.sh @@ -22,7 +22,7 @@ for client in "${client_instances[@]}"; do echo "### $client p2p stat" echo $client bootstrapped - $client p2p stat + $admin_client network stat echo done diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index bb8a8cfcd..18d70a4df 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -62,6 +62,7 @@ and shell = { let default_p2p_limits : P2p.limits = { connection_timeout = 10. ; authentication_timeout = 5. ; + greylist_timeout = 86400. ; (* one day *) min_connections = 10 ; expected_connections = 50 ; max_connections = 100 ; @@ -159,7 +160,7 @@ let default_config = { let limit : P2p.limits Data_encoding.t = let open Data_encoding in conv - (fun { P2p.connection_timeout ; authentication_timeout ; + (fun { P2p.connection_timeout ; authentication_timeout ; greylist_timeout ; min_connections ; expected_connections ; max_connections ; backlog ; max_incoming_connections ; max_download_speed ; max_upload_speed ; @@ -170,8 +171,8 @@ let limit : P2p.limits Data_encoding.t = max_known_points ; max_known_peer_ids ; swap_linger ; binary_chunks_size } -> - (((( connection_timeout, - authentication_timeout, min_connections, expected_connections, + (((( connection_timeout, authentication_timeout, + min_connections, expected_connections, max_connections, backlog, max_incoming_connections, max_download_speed, max_upload_speed, swap_linger), ( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size, @@ -179,9 +180,9 @@ let limit : P2p.limits Data_encoding.t = incoming_message_queue_size, outgoing_message_queue_size, known_points_history_size, known_peer_ids_history_size, max_known_points)), - max_known_peer_ids))) - (fun (((( connection_timeout, - authentication_timeout, min_connections, expected_connections, + ( max_known_peer_ids, greylist_timeout)))) + (fun (((( connection_timeout, authentication_timeout, + min_connections, expected_connections, max_connections, backlog, max_incoming_connections, max_download_speed, max_upload_speed, swap_linger), ( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size, @@ -189,8 +190,9 @@ let limit : P2p.limits Data_encoding.t = incoming_message_queue_size, outgoing_message_queue_size, known_points_history_size, known_peer_ids_history_size, max_known_points)), - max_known_peer_ids)) -> - { connection_timeout ; authentication_timeout ; min_connections ; expected_connections ; + ( max_known_peer_ids, greylist_timeout))) -> + { connection_timeout ; authentication_timeout ; greylist_timeout ; + min_connections ; expected_connections ; max_connections ; backlog ; max_incoming_connections ; max_download_speed ; max_upload_speed ; read_buffer_size ; read_queue_size ; write_queue_size ; @@ -271,8 +273,14 @@ let limit : P2p.limits Data_encoding.t = default_p2p_limits.known_points_history_size) (opt "max_known_points" (tup2 uint16 uint16)) )) - (obj1 - (opt "max_known_peer_ids" (tup2 uint16 uint16)))) + (obj2 + (opt "max_known_peer_ids" (tup2 uint16 uint16)) + (dft "greylist-timeout" + (Data_encoding.describe + ~description: "GC delay for the greylists tables, in seconds." + float) default_p2p_limits.greylist_timeout) + + )) let p2p = let open Data_encoding in diff --git a/src/lib_base/p2p_addr.ml b/src/lib_base/p2p_addr.ml index 07e421a86..0a36c45e7 100644 --- a/src/lib_base/p2p_addr.ml +++ b/src/lib_base/p2p_addr.ml @@ -26,3 +26,23 @@ let encoding = end type port = int + +let pp ppf addr = + match Ipaddr.v4_of_v6 addr with + | Some addr -> + Format.fprintf ppf "%a" Ipaddr.V4.pp_hum addr + | None -> + Format.fprintf ppf "[%a]" Ipaddr.V6.pp_hum addr + +let of_string_exn str = + match Ipaddr.of_string_exn str with + | V4 addr -> Ipaddr.v6_of_v4 addr + | V6 addr -> addr + +let of_string str = + try Ok (of_string_exn str) with + | Invalid_argument s -> Error s + | Failure s -> Error s + | _ -> Error "P2p_addr.of_string" + +let to_string saddr = Format.asprintf "%a" pp saddr diff --git a/src/lib_base/p2p_addr.mli b/src/lib_base/p2p_addr.mli index 616177775..5db8a8bd9 100644 --- a/src/lib_base/p2p_addr.mli +++ b/src/lib_base/p2p_addr.mli @@ -11,3 +11,10 @@ type t = Ipaddr.V6.t type port = int val encoding : t Data_encoding.t + +val pp : Format.formatter -> t -> unit + +val of_string : string -> (t, string) result +val of_string_exn : string -> t + +val to_string : t -> string diff --git a/src/lib_base/time.mli b/src/lib_base/time.mli index eb7ac7631..1346e17b3 100644 --- a/src/lib_base/time.mli +++ b/src/lib_base/time.mli @@ -10,6 +10,8 @@ type t include Compare.S with type t := t +val hash : t -> int + val min_value : t val epoch : t val max_value : t diff --git a/src/lib_client_commands/client_p2p_commands.ml b/src/lib_client_commands/client_p2p_commands.ml index 5d08fcae1..2c602ff1b 100644 --- a/src/lib_client_commands/client_p2p_commands.ml +++ b/src/lib_client_commands/client_p2p_commands.ml @@ -11,8 +11,24 @@ let group = { Clic.name = "p2p" ; title = "Commands for monitoring and controlling p2p-layer state" } -let commands () = [ +let port_arg () = let open Clic in + default_arg + ~long:"port" + ~placeholder:"number" + ~doc:"peer-to-peer port of the node" + ~default: "9732" + (parameter + (fun _ x -> try + return (int_of_string x) + with Failure _ -> + failwith "Invalid peer-to-peer port")) + +let commands () = + let open Clic in + let addr_parameter = parameter (fun _ x -> return (P2p_addr.of_string_exn x)) in + [ + command ~group ~desc: "show global network status" no_options (prefixes ["p2p" ; "stat"] stop) begin fun () (cctxt : #Client_context.full) -> @@ -65,5 +81,109 @@ let commands () = [ (if pi.trusted then "★" else " ") end points >>= fun () -> return () - end + end ; + + command ~group ~desc: "Connect to a new point." + (args1 (port_arg ())) + (prefixes [ "connect" ; "address" ] + @@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter + @@ stop) + (fun port address (cctxt : #Client_context.full) -> + P2p_services.connect cctxt ~timeout:10. (address,port) >>=? fun () -> + return () + ); + + command ~group ~desc: "Remove an IP address from the blacklist and whitelist." + no_options + (prefixes [ "forget" ; "address" ] + @@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter + @@ stop) + (fun () address (cctxt : #Client_context.full) -> + P2p_services.Points.forget cctxt (address,0) >>=? fun () -> + return () + ); + + command ~group ~desc: "Add a IP address to the blacklist." + no_options + (prefixes [ "ban" ; "address" ] + @@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter + @@ stop) + (fun () address (cctxt : #Client_context.full) -> + P2p_services.Points.ban cctxt (address,0) >>=? fun () -> + return () + ); + + command ~group ~desc: "Add a IP address to the whitelist." + no_options + (prefixes [ "trust" ; "address" ] + @@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter + @@ stop) + (fun () address (cctxt : #Client_context.full) -> + P2p_services.Points.trust cctxt (address,0) >>=? fun () -> + return () + ); + + command ~group ~desc: "Check if an IP address is banned." + no_options + (prefixes [ "is" ; "address" ; "banned" ] + @@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter + @@ stop) + (fun () address (cctxt : #Client_context.full) -> + P2p_services.Points.is_banned cctxt (address,0) >>=? fun answer -> + cctxt#message + "The given ip address is %s" + (if answer then "banned" else "not banned") >>= fun () -> + return () + ); + + command ~group ~desc: "Remove a peer ID from the blacklist and whitelist." + no_options + (prefixes [ "forget" ; "peer" ] + @@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity" + @@ stop) + (fun () peer (cctxt : #Client_context.full) -> + P2p_services.Peers.forget cctxt peer >>=? fun () -> + return () + ); + + command ~group ~desc: "Add a peer ID to the blacklist." + no_options + (prefixes [ "ban" ; "peer" ] + @@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity" + @@ stop) + (fun () peer (cctxt : #Client_context.full) -> + P2p_services.Peers.ban cctxt peer >>=? fun () -> + return () + ); + + command ~group ~desc: "Add a peer ID to the whitelist." + no_options + (prefixes [ "trust" ; "peer" ] + @@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity" + @@ stop) + (fun () peer (cctxt : #Client_context.full) -> + P2p_services.Peers.trust cctxt peer >>=? fun () -> + return () + ); + + command ~group ~desc: "Check if a peer ID is banned." + no_options + (prefixes [ "is" ; "peer" ; "banned" ] + @@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity" + @@ stop) + (fun () peer (cctxt : #Client_context.full) -> + P2p_services.Peers.is_banned cctxt peer >>=? fun answer -> + cctxt#message + "The given peer ID is %s" + (if answer then "banned" else "not banned") >>= fun () -> + return () + ); + + command ~group ~desc: "Clear all greylist tables." + no_options + (prefixes [ "clear" ; "greylists" ] @@ stop) + (fun () (cctxt : #Client_context.full) -> + P2p_services.Greylist.clear cctxt () >>=? fun () -> + return () + ); ] diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 985e5d243..e2c4f09c5 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -43,6 +43,7 @@ type limits = { connection_timeout : float ; authentication_timeout : float ; + greylist_timeout : float ; min_connections : int ; expected_connections : int ; @@ -97,6 +98,7 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched = max_incoming_connections = limits.max_incoming_connections ; connection_timeout = limits.connection_timeout ; authentication_timeout = limits.authentication_timeout ; + greylist_timeout = limits.greylist_timeout ; incoming_app_message_queue_size = limits.incoming_app_message_queue_size ; incoming_message_queue_size = limits.incoming_message_queue_size ; outgoing_message_queue_size = limits.outgoing_message_queue_size ; @@ -132,7 +134,7 @@ let create_maintenance_worker limits pool = ~expected:limits.expected_connections ~max:limits.max_connections in - P2p_maintenance.run bounds pool + P2p_maintenance.run bounds ~greylist_timeout:limits.greylist_timeout pool let may_create_welcome_worker config limits pool = match config.listening_port with @@ -444,6 +446,16 @@ let fold_connections net = net.fold_connections let iter_connections net = net.iter_connections let on_new_connection net = net.on_new_connection +let temp_ban_peer net peer_id = + match net.pool with + |None -> () + |Some pool -> P2p_pool.temp_ban_peer pool peer_id + +let temp_ban_addr net addr = + match net.pool with + |None -> () + |Some pool -> P2p_pool.temp_ban_addr pool addr + module Raw = struct type 'a t = 'a P2p_pool.Message.t = | Bootstrap @@ -536,6 +548,9 @@ let build_rpc_directory net = match net.pool with | None -> failwith "The node has disable the P2P layer." | Some pool -> + fail_unless + (not(P2p_pool.Points.is_banned pool point)) + (P2p_errors.Point_banned point) >>=? fun () -> ignore (P2p_pool.connect ~timeout pool point : _ tzresult Lwt.t) ; return () end in @@ -638,6 +653,37 @@ let build_rpc_directory net = RPC_answer.return_stream { next ; shutdown } end in + let dir = + RPC_directory.gen_register1 dir P2p_services.Peers.S.ban + begin fun peer_id () () -> + match net.pool with + | None -> RPC_answer.not_found + | Some pool -> + P2p_pool.Peers.unset_trusted pool peer_id; + P2p_pool.Peers.ban pool peer_id ; + RPC_answer.return () + end in + + let dir = + RPC_directory.gen_register1 dir P2p_services.Peers.S.trust + begin fun peer_id () () -> + match net.pool with + | None -> RPC_answer.not_found + | Some pool -> + P2p_pool.Peers.set_trusted pool peer_id ; + RPC_answer.return () + end in + + let dir = + RPC_directory.register1 dir P2p_services.Peers.S.is_banned + begin fun peer_id () () -> + match net.pool with + | None -> return false + | Some pool -> + if P2p_pool.Peers.get_trusted pool peer_id then return false + else return (P2p_pool.Peers.is_banned pool peer_id) + end in + (* Network : Point *) let dir = @@ -700,4 +746,49 @@ let build_rpc_directory net = RPC_answer.return_stream { next ; shutdown } end in + let dir = + RPC_directory.gen_register1 dir P2p_services.Points.S.ban + begin fun point () () -> + match net.pool with + | None -> RPC_answer.not_found + | Some pool -> + P2p_pool.Points.unset_trusted pool point; + P2p_pool.Points.ban pool point; + RPC_answer.return () + end in + + let dir = + RPC_directory.gen_register1 dir P2p_services.Points.S.trust + begin fun point () () -> + match net.pool with + | None -> RPC_answer.not_found + | Some pool -> + P2p_pool.Points.set_trusted pool point ; + RPC_answer.return () + end in + + let dir = + RPC_directory.gen_register1 dir P2p_services.Points.S.is_banned + begin fun point () () -> + match net.pool with + | None -> RPC_answer.return false + | Some pool -> + if P2p_pool.Points.get_trusted pool point then + RPC_answer.return false + else + RPC_answer.return (P2p_pool.Points.is_banned pool point) + end in + + (* Network : Greylist *) + + let dir = + RPC_directory.register dir P2p_services.Greylist.S.clear + begin fun () () () -> + match net.pool with + | None -> return () + | Some pool -> + P2p_pool.greylist_clear pool ; + return () + end in + dir diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index d28725507..c50793040 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -73,6 +73,9 @@ type limits = { authentication_timeout : float ; (** Delay granted to a peer to perform authentication, in seconds. *) + greylist_timeout : float ; + (** GC delay for the grelists tables, in seconds. *) + min_connections : int ; (** Strict minimum number of connections (triggers an urgent maintenance) *) @@ -210,6 +213,9 @@ val on_new_connection : val build_rpc_directory : _ t -> unit RPC_directory.t +val temp_ban_addr : ('msg, 'meta) net -> P2p_addr.t -> unit +val temp_ban_peer : ('msg, 'meta) net -> P2p_peer.Id.t -> unit + (**/**) module Raw : sig diff --git a/src/lib_p2p/p2p_acl.ml b/src/lib_p2p/p2p_acl.ml new file mode 100644 index 000000000..674ff565f --- /dev/null +++ b/src/lib_p2p/p2p_acl.ml @@ -0,0 +1,223 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module PeerRing = Ring.MakeTable(struct + include P2p_peer.Id + let hash = Hashtbl.hash + end) + +module PatriciaTree(V:HashPtree.Value) = struct + module Size = struct + let size = 128 + end + module Bits = HashPtree.Bits(Size) + module M = HashPtree.Make_BE_sized(V)(Size) + + type t = M.t + let empty = M.empty + + (* take into consideration the fact that the int64 + * returned by Ipaddr.V6.to_int64 is signed *) + let z_of_bytes i = + let i = Z.of_int64 i in + Z.(if i < zero then i + of_int 2 ** 64 else i) + + let z_of_ipv6 ip = + let hi_x, lo_x = Ipaddr.V6.to_int64 ip in + let hi = z_of_bytes hi_x in + let lo = z_of_bytes lo_x in + Z.((hi lsl 64) + lo) + + let key_of_ipv6 ip = + Bits.of_z (z_of_ipv6 ip) + + let z_mask_of_ipv6_prefix p = + let ip = Ipaddr.V6.Prefix.network p in + let len = Ipaddr.V6.Prefix.bits p in + z_of_ipv6 ip, Z.(lsl) Z.one (128 - len) + + let key_mask_of_ipv6_prefix p = + let z, m = z_mask_of_ipv6_prefix p in + Bits.of_z z, Bits.of_z m + + let z_to_ipv6 z = + (* assumes z is a 128 bit value *) + let hi_z = Z.(z asr 64) in + let hi = + if Z.(hi_z >= of_int 2 ** 63) then + (* If overflows int64, then returns the bit equivalent + representation (which is negative) *) + Int64.add 0x8000000000000000L + ((Z.(to_int64 (hi_z - (of_int 2 ** 63))))) + else + Z.(to_int64 hi_z) + in + let lo = Z.(to_int64 (z mod (pow ~$2 64))) in + Ipaddr.V6.of_int64 (hi, lo) + + let remove key t = + M.remove (key_of_ipv6 key) t + + let remove_prefix prefix t = + let key, mask = key_mask_of_ipv6_prefix prefix in + M.remove_prefix key mask t + + let add_prefix prefix value t = + let key, mask = key_mask_of_ipv6_prefix prefix in + M.add (fun _ v -> v) ~key ~value ~mask t + + let add key value t = + let key = key_of_ipv6 key in + M.add (fun _ v -> v) ~key ~value t + + let mem key t = M.mem (key_of_ipv6 key) t + + let key_mask_to_prefix key mask = + let len = + if Bits.(equal mask zero) then 0 + else 128 - (Z.trailing_zeros (Bits.to_z mask)) + in + Ipaddr.V6.Prefix.make len (z_to_ipv6 (Bits.to_z key)) + + let fold f t acc = + let f key mask value acc = + let prefix = key_mask_to_prefix key mask in + f prefix value acc + in + M.fold f t acc + + let pp ppf t = + let lst = fold (fun p _ l -> p :: l) t [] in + Format.fprintf ppf "@[<2>[%a]@]" + Format.(pp_print_list ~pp_sep:(fun ppf () -> fprintf ppf ";@ ") + Ipaddr.V6.Prefix.pp_hum) + lst + +end + +(* patricia trees using IpV6 addresses as keys *) +module IpSet = struct + + include PatriciaTree(Time) + + let gc t ~delay = + let timenow = Time.now() in + let module MI = + struct + type result = Time.t + let default = Time.now() + let map _t _key value = value + let reduce _t left right = Time.(max left right) + end + in + let module MR = M.Map_Reduce(MI) in + MR.filter (fun addtime -> + Time.(timenow < (add addtime (Int64.of_float delay))) + ) t + +end + +module IpTable = Hashtbl.Make(struct + type t = Ipaddr.V6.t + let hash = Hashtbl.hash + let equal x y = Ipaddr.V6.compare x y = 0 + end) + +type raw = { + mutable greylist_ips : IpSet.t ; + greylist_peers : PeerRing.t ; + banned_ips : unit IpTable.t ; + banned_peers : unit P2p_peer.Table.t ; +} + +type t = raw ref + +let create size = ref { + greylist_ips = IpSet.empty; + greylist_peers = PeerRing.create size; + banned_ips = IpTable.create 53; + banned_peers = P2p_peer.Table.create 53; + } + +(* check if an ip is banned. priority is for static blacklist, then + in the greylist *) +let is_banned_addr acl addr = + (IpTable.mem !acl.banned_ips addr) || + (IpSet.mem addr !acl.greylist_ips) + +(* Check is the peer_id is in the banned ring. It might be possible that + a peer ID that is not banned, but its ip address is. *) +let is_banned_peer acl peer_id = + (P2p_peer.Table.mem !acl.banned_peers peer_id) || + (PeerRing.mem !acl.greylist_peers peer_id) + +let greylist_clear acl = + !acl.greylist_ips <- IpSet.empty; + P2p_peer.Table.clear !acl.banned_peers; + IpTable.clear !acl.banned_ips; + PeerRing.clear !acl.greylist_peers + +module IPGreylist = struct + + (* Add the given ip address to the ip greylist *) + let add acl addr = + !acl.greylist_ips <- IpSet.add addr (Time.now()) !acl.greylist_ips + + let mem acl addr = IpSet.mem addr !acl.greylist_ips + + (* The GC operation works only on the address set. Peers are removed + from the ring in a round-robin fashion. If a address is removed + by the GC from the acl.greylist set, it could potentially + persist in the acl.peers set until more peers are banned. *) + let gc acl ~delay = + !acl.greylist_ips <- IpSet.gc !acl.greylist_ips ~delay + + let encoding = Data_encoding.(list P2p_addr.encoding) + +end + +module IPBlacklist = struct + + (* Add the given ip address to the ip blacklist *) + let add acl addr = + IpTable.add !acl.banned_ips addr () + + (* Remove the given ip address to the ip blacklist *) + let remove acl addr = + IpTable.remove !acl.banned_ips addr + + let mem acl addr = + IpTable.mem !acl.banned_ips addr + +end + +module PeerBlacklist = struct + + let add acl addr = + P2p_peer.Table.add !acl.banned_peers addr () + + let remove acl addr = + P2p_peer.Table.remove !acl.banned_peers addr + + let mem acl addr = + P2p_peer.Table.mem !acl.banned_peers addr + +end + +module PeerGreylist = struct + + (* Ban the given peer_id. It also add the given ip address to the blacklist. *) + let add acl peer_id = + PeerRing.add !acl.greylist_peers peer_id + + let mem acl peer_id = + (PeerRing.mem !acl.greylist_peers peer_id) + +end + diff --git a/src/lib_p2p/p2p_acl.mli b/src/lib_p2p/p2p_acl.mli new file mode 100644 index 000000000..686b11cf2 --- /dev/null +++ b/src/lib_p2p/p2p_acl.mli @@ -0,0 +1,100 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** + This module implements four tables + - ip grey lists used to automatically ban a given ip addr + - peer_id greylist used to automatically ban a given peer_id + - ip black lists used to manually ban a given ip addr + - peers black list used to manually trust a given peer_id + + IP greylists use a time based GC to periodically remove entries from + the table, while peer_id grey lists are built using a ring structure, + where peers are removed from the table when removed from the fixed size + ring. Other tables are user defined and static. + +*) + + +type t + +(** Create a new ACL of given size *) +val create : int -> t + +(** Check if an address is banned either temporally or permanently *) +val is_banned_addr : t -> P2p_addr.t -> bool + +(** Check if a peer is banned either temporally or permanently *) +val is_banned_peer : t -> P2p_peer.Id.t -> bool + +(** Reinitialize the Greylist tables *) +val greylist_clear : t -> unit + +module IPGreylist : sig + + (* Add the given ip address to the ip greylist *) + val add: t -> P2p_addr.t -> unit + + (** [gc time] removes all banned peers older than the given time in seconds + The GC operation works only on the address set. Peers are removed + from the ring in a round-robin fashion. If a address is removed + by the GC from the greylist set, it could potentially + persist in the peers' blacklist set until more peers are banned. *) + val gc: t -> delay:float -> unit + + val encoding: P2p_addr.t list Data_encoding.t + +end + +module IPBlacklist : sig + + (* Add the given ip address to the ip blacklist *) + val add: t -> P2p_addr.t -> unit + + (* Remove the given ip address to the ip blacklist *) + val remove: t -> P2p_addr.t -> unit + +end + +module PeerBlacklist : sig + + (* Add the given ip address to the ip blacklist *) + val add: t -> P2p_peer.Id.t -> unit + + (* Remove the given ip address to the ip blacklist *) + val remove: t -> P2p_peer.Id.t -> unit + +end + + +module PeerGreylist : sig + + (* Ban the given peer_id. It also add the given ip address to the blacklist. *) + val add: t -> P2p_peer.Id.t -> unit + +end + +(** / *) + +module PeerRing : Ring.TABLE with type v = P2p_peer.Id.t + +module IpSet : sig + type t + val empty: t + val add : Ipaddr.V6.t -> Time.t -> t -> t + val add_prefix : Ipaddr.V6.Prefix.t -> Time.t -> t -> t + val remove : Ipaddr.V6.t -> t -> t + val remove_prefix : Ipaddr.V6.Prefix.t -> t -> t + val mem : Ipaddr.V6.t -> t -> bool + val fold: (Ipaddr.V6.Prefix.t -> Time.t -> 'a -> 'a) -> t -> 'a -> 'a + val pp : Format.formatter -> t -> unit + val gc : t -> delay:float -> t +end + +module IpTable : Hashtbl.S with type key = Ipaddr.V6.t diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index dac5243e7..8f863f660 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -20,6 +20,7 @@ type 'meta pool = Pool : ('msg, 'meta) P2p_pool.t -> 'meta pool type 'meta t = { canceler: Lwt_canceler.t ; + greylist_timeout: float; bounds: bounds ; pool: 'meta pool ; just_maintained: unit Lwt_condition.t ; @@ -29,8 +30,8 @@ type 'meta t = { (** Select [expected] points amongst the disconnected known points. It ignores points which are greylisted, or for which a connection - failed after [start_time]. It first selects points with the oldest - last tentative. *) + failed after [start_time] and the pointes that are banned. It + first selects points with the oldest last tentative. *) let connectable st start_time expected = let Pool pool = st.pool in let now = Time.now () in @@ -52,6 +53,7 @@ let connectable st start_time expected = match P2p_point_state.Info.last_miss pi with | Some last when Time.(start_time < last) || P2p_point_state.Info.greylisted ~now pi -> () + | _ when (P2p_pool.Points.is_banned pool point) -> () | last -> Bounded_point_info.insert (last, point) acc end @@ -87,10 +89,13 @@ let rec try_to_contact (min_to_contact - established) (max_to_contact - established) (** Do a maintenance step. It will terminate only when the number - of connections is between `min_threshold` and `max_threshold`. *) + of connections is between `min_threshold` and `max_threshold`. + Do a pass in the list of banned peers and remove all peers that + have been banned for more then xxx seconds *) let rec maintain st = let Pool pool = st.pool in let n_connected = P2p_pool.active_connections pool in + P2p_pool.gc_greylist pool ~delay:st.greylist_timeout; if n_connected < st.bounds.min_threshold then too_few_connections st n_connected else if st.bounds.max_threshold < n_connected then @@ -163,10 +168,11 @@ let rec worker_loop st = | Error [ Canceled ] -> Lwt.return_unit | Error _ -> Lwt.return_unit -let run bounds pool = +let run ~greylist_timeout bounds pool = let canceler = Lwt_canceler.create () in let st = { canceler ; + greylist_timeout ; bounds ; pool = Pool pool ; just_maintained = Lwt_condition.create () ; diff --git a/src/lib_p2p/p2p_maintenance.mli b/src/lib_p2p/p2p_maintenance.mli index 58c5b1025..f1e85b25d 100644 --- a/src/lib_p2p/p2p_maintenance.mli +++ b/src/lib_p2p/p2p_maintenance.mli @@ -37,9 +37,11 @@ type 'meta t (** Type of a maintenance worker. *) val run: + greylist_timeout:float -> bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t -(** [run bounds pool] is a maintenance worker for [pool] with - connection targets specified in [bounds]. *) +(** [run ~greylist_timeout bounds pool] is a maintenance worker for + [pool] with connection targets specified in [bounds] and greylist + GC frequency [greylist_timeout]. *) val maintain: 'meta t -> unit Lwt.t (** [maintain t] gives a hint to maintenance worker [t] that diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 66e614f92..6464075dd 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -176,6 +176,7 @@ type config = { max_incoming_connections : int ; connection_timeout : float ; authentication_timeout : float ; + greylist_timeout : float ; incoming_app_message_queue_size : int option ; incoming_message_queue_size : int option ; @@ -218,6 +219,7 @@ type ('msg, 'meta) t = { encoding : 'msg Message.t Data_encoding.t ; events : events ; watcher : P2p_connection.Pool_event.t Lwt_watcher.input ; + acl : P2p_acl.t; mutable new_connection_hook : (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) list ; mutable latest_accepted_swap : Time.t ; @@ -396,12 +398,76 @@ let broadcast_bootstrap_msg pool = (***************************************************************************) +(* this function duplicates bit of code from the modules below to avoid + creating mutually recurvive modules *) +let get_addr pool peer_id = + let info peer_id = + try Some (P2p_peer.Table.find pool.known_peer_ids peer_id) + with Not_found -> None + in + let find_by_peer_id peer_id = + Option.apply + (info peer_id) + ~f:(fun p -> + match P2p_peer_state.get p with + | Running { data } -> Some data + | _ -> None) + in + match find_by_peer_id peer_id with + |None -> None + |Some ci -> + let info = P2p_socket.info ci.conn in + Some(info.id_point) + +module Points = struct + + type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t + + let info { known_points } point = + try Some (P2p_point.Table.find known_points point) + with Not_found -> None + + let get_trusted pool point = + try P2p_point_state.Info.trusted (P2p_point.Table.find pool.known_points point) + with Not_found -> false + + let set_trusted pool point = + try + P2p_point_state.Info.set_trusted + (register_point pool pool.config.identity.peer_id point) + with Not_found -> () + + let unset_trusted pool point = + try P2p_point_state.Info.unset_trusted (P2p_point.Table.find pool.known_points point) + with Not_found -> () + + let fold_known pool ~init ~f = + P2p_point.Table.fold f pool.known_points init + + let fold_connected pool ~init ~f = + P2p_point.Table.fold f pool.connected_points init + + let is_banned pool point = + P2p_acl.is_banned_addr pool.acl (fst point) + + let ban pool point = + P2p_acl.IPBlacklist.add pool.acl (fst point) + + let trust pool point = + P2p_acl.IPBlacklist.remove pool.acl (fst point) + + let forget pool point = + unset_trusted pool point; (* remove from whitelist *) + P2p_acl.IPBlacklist.remove pool.acl (fst point) + +end + module Peers = struct type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t - let info { known_peer_ids } point = - try Some (P2p_peer.Table.find known_peer_ids point) + let info { known_peer_ids } peer_id = + try Some (P2p_peer.Table.find known_peer_ids peer_id) with Not_found -> None let get_metadata pool peer_id = @@ -432,35 +498,29 @@ module Peers = struct let fold_connected pool ~init ~f = P2p_peer.Table.fold f pool.connected_peer_ids init -end + let forget pool peer = + match get_addr pool peer with + |None -> () + |Some point -> + unset_trusted pool peer; (* remove from whitelist *) + P2p_acl.PeerBlacklist.remove pool.acl peer; + P2p_acl.IPBlacklist.remove pool.acl (fst point) -module Points = struct + let ban pool peer = + match get_addr pool peer with + |None -> () + |Some point -> + Points.ban pool point; + P2p_acl.PeerBlacklist.add pool.acl peer - type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t + let trust pool peer = + match get_addr pool peer with + |None -> () + |Some point -> + Points.trust pool point - let info { known_points } point = - try Some (P2p_point.Table.find known_points point) - with Not_found -> None - - let get_trusted pool point = - try P2p_point_state.Info.trusted (P2p_point.Table.find pool.known_points point) - with Not_found -> false - - let set_trusted pool point = - try - P2p_point_state.Info.set_trusted - (register_point pool pool.config.identity.peer_id point) - with Not_found -> () - - let unset_trusted pool peer_id = - try P2p_point_state.Info.unset_trusted (P2p_point.Table.find pool.known_points peer_id) - with Not_found -> () - - let fold_known pool ~init ~f = - P2p_point.Table.fold f pool.known_points init - - let fold_connected pool ~init ~f = - P2p_point.Table.fold f pool.connected_points init + let is_banned pool peer = + P2p_acl.is_banned_peer pool.acl peer end @@ -532,10 +592,24 @@ module Connection = struct end +let temp_ban_addr pool addr = + P2p_acl.IPGreylist.add pool.acl addr + +let temp_ban_peer pool peer = + match get_addr pool peer with + |None -> () + |Some point -> + temp_ban_addr pool (fst point); + P2p_acl.PeerGreylist.add pool.acl peer + +let greylist_clear pool = + P2p_acl.greylist_clear pool.acl + +let gc_greylist ~delay pool = P2p_acl.IPGreylist.gc ~delay pool.acl + let pool_stat { io_sched } = P2p_io_scheduler.global_stat io_sched - (***************************************************************************) let fail_unless_disconnected_point point_info = @@ -571,6 +645,9 @@ let compare_known_point_info p1 p2 = | true, true -> compare_last_seen p2 p1 let rec connect ?timeout pool point = + fail_unless + (not(Points.is_banned pool point)) + (P2p_errors.Point_banned point) >>=? fun () -> let timeout = Option.unopt ~default:pool.config.connection_timeout timeout in fail_unless @@ -645,6 +722,9 @@ and authenticate pool ?point_info canceler fd point = lwt_debug "authenticate: %a -> auth %a" P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> + fail_unless + (not(Peers.is_banned pool info.peer_id)) + (P2p_errors.Peer_banned info.peer_id) >>=? fun () -> let remote_point_info = match info.id_point with | addr, Some port @@ -901,7 +981,9 @@ and swap pool conn current_peer_id new_point = let accept pool fd point = log pool (Incoming_connection point) ; if pool.config.max_incoming_connections <= P2p_point.Table.length pool.incoming - || pool.config.max_connections <= active_connections pool then + || pool.config.max_connections <= active_connections pool + (* silently ignore banned points *) + || (P2p_acl.is_banned_addr pool.acl (fst point)) then Lwt.async (fun () -> Lwt_utils_unix.safe_close fd) else let canceler = Lwt_canceler.create () in @@ -947,6 +1029,7 @@ let create config meta_config message_config io_sched = encoding = Message.encoding message_config.encoding ; events ; watcher = Lwt_watcher.create_input () ; + acl = P2p_acl.create 1023; new_connection_hook = [] ; latest_accepted_swap = Time.epoch ; latest_succesfull_swap = Time.epoch ; diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index 7db6b658b..c3fbb5a5b 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -81,6 +81,9 @@ type config = { authentication_timeout : float ; (** Delay granted to a peer to perform authentication, in seconds. *) + greylist_timeout : float ; + (** GC delay for the grelists tables, in seconds. *) + incoming_app_message_queue_size : int option ; (** Size of the message queue for user messages (messages returned by this module's [read] function. *) @@ -174,6 +177,7 @@ module Pool_event : sig end + (** {1 Connections management} *) type ('msg, 'meta) connection @@ -266,6 +270,11 @@ val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit (** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap] for all member connections to [pool] in [Running] state. *) +val temp_ban_peer : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit +val temp_ban_addr : ('msg, 'meta) pool -> P2p_addr.t -> unit +val gc_greylist: delay:float -> ('msg, 'meta) pool -> unit +val greylist_clear : ('msg, 'meta) pool -> unit + (** {1 Functions on [Peer_id]} *) module Peers : sig @@ -295,6 +304,11 @@ module Peers : sig f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) -> 'a + val forget : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit + val ban : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit + val trust : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit + val is_banned : ('msg, 'meta) pool -> P2p_peer.Id.t -> bool + end (** {1 Functions on [Points]} *) @@ -322,6 +336,11 @@ module Points : sig f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) -> 'a + val forget : ('msg, 'meta) pool -> P2p_point.Id.t -> unit + val ban : ('msg, 'meta) pool -> P2p_point.Id.t -> unit + val trust : ('msg, 'meta) pool -> P2p_point.Id.t -> unit + val is_banned : ('msg, 'meta) pool -> P2p_point.Id.t -> bool + end val watch: ('msg, 'meta) pool -> P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper diff --git a/src/lib_p2p/test/jbuild b/src/lib_p2p/test/jbuild index a09d53e84..5df3d57f0 100644 --- a/src/lib_p2p/test/jbuild +++ b/src/lib_p2p/test/jbuild @@ -3,7 +3,11 @@ (executables ((names (test_p2p_socket test_p2p_pool - test_p2p_io_scheduler)) + test_p2p_io_scheduler + test_p2p_peerset + test_p2p_ipv6set + test_p2p_banned_peers + )) (libraries (tezos-base tezos-stdlib-unix tezos-shell-services @@ -21,7 +25,11 @@ ((name buildtest) (deps (test_p2p_socket.exe test_p2p_pool.exe - test_p2p_io_scheduler.exe)))) + test_p2p_io_scheduler.exe + test_p2p_peerset.exe + test_p2p_ipv6set.exe + test_p2p_banned_peers.exe + )))) (alias ((name runtest_p2p_socket) @@ -39,11 +47,27 @@ --max-download-speed 1048576 ;; 1 << 20 = 1MB )))) +(alias + ((name runtest_p2p_ipv6set) + (action (run ${exe:test_p2p_ipv6set.exe} -v)))) + +(alias + ((name runtest_p2p_peerset) + (action (run ${exe:test_p2p_peerset.exe} -v)))) + +(alias + ((name runtest_p2p_banned_peers) + (action (run ${exe:test_p2p_banned_peers.exe} -v)))) + (alias ((name runtest) (deps ((alias runtest_p2p_socket) (alias runtest_p2p_pool) - (alias runtest_p2p_io_scheduler))))) + (alias runtest_p2p_io_scheduler) + (alias runtest_p2p_peerset) + (alias runtest_p2p_ipv6set) + (alias runtest_p2p_banned_peers) + )))) (alias ((name runtest_indent) diff --git a/src/lib_p2p/test/test_p2p_banned_peers.ml b/src/lib_p2p/test/test_p2p_banned_peers.ml new file mode 100644 index 000000000..bb4a3ee5d --- /dev/null +++ b/src/lib_p2p/test/test_p2p_banned_peers.ml @@ -0,0 +1,66 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Error_monad + +include Logging.Make (struct let name = "test-p2p-banned_peers" end) + +let assert_equal_bool ~msg a b = + if a <> b then Alcotest.fail msg + +let a = fun (peer,addr) -> + (P2p_peer.Id.hash_string [peer], Ipaddr.V6.of_string_exn addr) + +let foo = a ("foo","ffff::3") +let bar = a ("bar","ffff:00::ff") +let baz = a ("baz","a::2") +let peers = [foo;bar;baz] + +let test_empty _ = + let empty = P2p_acl.create 10 in + List.iter (fun (_peer,addr) -> + assert_equal_bool ~msg:__LOC__ false (P2p_acl.is_banned_addr empty addr) + ) peers ; + Lwt.return () +;; + +let test_ban _ = + let set = P2p_acl.create 10 in + List.iter (fun (_,addr) -> P2p_acl.IPGreylist.add set addr) peers; + List.iter (fun (_,addr) -> + assert_equal_bool ~msg:__LOC__ true (P2p_acl.is_banned_addr set addr) + ) peers ; + Lwt.return () +;; + +let test_gc _ = + let set = P2p_acl.create 10 in + List.iter (fun (_,addr) -> P2p_acl.IPGreylist.add set addr) peers; + List.iter (fun (_peer,addr) -> + assert_equal_bool ~msg:__LOC__ true (P2p_acl.is_banned_addr set addr) + ) peers ; + Lwt_unix.sleep 3. >>= fun _ -> + (* remove all peers after one second *) + P2p_acl.IPGreylist.gc set ~delay:1. ; + List.iter (fun (_peer,addr) -> + assert_equal_bool ~msg:__LOC__ false (P2p_acl.is_banned_addr set addr) + ) peers ; + Lwt.return () + +let () = + let wrap (n, f) = + Alcotest_lwt.test_case n `Quick (fun _ () -> f ()) in + Alcotest.run ~argv:[|""|] "tezos-p2p" [ + "p2p.peerset", + List.map wrap [ + "empty", test_empty ; + "ban", test_ban; + "gc", test_gc; + ] + ] diff --git a/src/lib_p2p/test/test_p2p_ipv6set.ml b/src/lib_p2p/test/test_p2p_ipv6set.ml new file mode 100644 index 000000000..809e19f9b --- /dev/null +++ b/src/lib_p2p/test/test_p2p_ipv6set.ml @@ -0,0 +1,139 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make (struct let name = "test-p2p-banned_ip" end) + +let assert_equal ?(eq = (=)) ?prn ~msg a b = + let msg = match prn with + | None -> msg + | Some prn -> + Format.asprintf "@[%s@,n(%a)@,<>@,(%a)@]" msg prn a prn b in + if not (eq a b) then Alcotest.fail msg + +let assert_equal_bool = assert_equal + +let a = Ipaddr.V6.of_string_exn +let p = Ipaddr.V6.Prefix.of_string_exn + +let timenow = Time.now () + +let of_list l = + List.fold_left (fun acc k -> + P2p_acl.IpSet.add_prefix k timenow acc + ) P2p_acl.IpSet.empty l + +let test_empty _ = + let addrs = List.map a [ "::" ; "ffff::" ; "a::2" ; ] in + List.iter (fun addr -> + assert_equal_bool ~msg:__LOC__ false (P2p_acl.IpSet.mem addr P2p_acl.IpSet.empty) + ) addrs + +let test_inclusion _ = + let set = P2p_acl.IpSet.add_prefix (p "ffff::/16") timenow P2p_acl.IpSet.empty in + let included = List.map a [ "ffff::3" ; "ffff:ffff::" ; "ffff:00::ff" ; ] in + let not_included = List.map a [ "fffe::3" ; "ffee:ffff::" ; "::" ; ] in + List.iter (fun addr -> + assert_equal_bool ~msg:__LOC__ true (P2p_acl.IpSet.mem addr set) + ) included ; + List.iter (fun addr -> + assert_equal_bool ~msg:__LOC__ false (P2p_acl.IpSet.mem addr set) + ) not_included; + + let set = P2p_acl.IpSet.add_prefix (p "f000::/4") timenow P2p_acl.IpSet.empty in + assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "e000::") set) ; + + (* Add one IP *) + let set = P2p_acl.IpSet.add_prefix (p "::/128") timenow P2p_acl.IpSet.empty in + assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "1::") set) ; + + let set = P2p_acl.IpSet.add_prefix (p "ffff:eeee::/32") timenow P2p_acl.IpSet.empty in + assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "eeee:ffff::1") set) ; + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "ffff:eeee::1") set) ; + + let set = P2p_acl.IpSet.add_prefix (p "::/17") timenow P2p_acl.IpSet.empty in + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "0000:0000::") set) ; + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "0000:7000::") set) ; + assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "0000:8000::1") set) ; + + let setlist = [p "e000::/4" ; p "a000::/4" ; p "ffff::/16"] in + let set = of_list setlist in + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "ffff::1") set) ; + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "a111:8000::1") set) ; + + let set = of_list [p "e000::/4" ; p "a000::/4" ; + p "1234:5678::1/128"; p "ffff::/16"] in + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "1234:5678::1") set) ; + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "a111:8000::1") set) ; + assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "b111:8000::1") set) ; + assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "1234:5678::100") set) + + +let test_contiguous _ = + let set = of_list [p "::/1" ; p "8000::/1"] in + List.iter (fun addr -> + assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem addr set) + ) [a "00::" ; a "01::" ; a "ff::" ] + +module PSet = Set.Make(Ipaddr.V6.Prefix) + +let test_fold _ = + let addr_list = [p "::/1" ; p "8000::/1" ; p "ffff:ffff::/32" ; ] in + let pset = PSet.of_list addr_list in + let ipv6set = + P2p_acl.IpSet.fold (fun prefix _value s -> + PSet.add prefix s + ) (of_list addr_list) PSet.empty ; + in + assert_equal ~eq:PSet.equal ~msg:__LOC__ ipv6set pset + +let print_pset ppf pset = + PSet.iter (fun p -> + Format.fprintf ppf "%a " Ipaddr.V6.Prefix.pp_hum p + ) pset + +let print_list ppf l = + List.iter (fun p -> + Format.fprintf ppf "%a " Ipaddr.V6.Prefix.pp_hum p + ) l + +let test_to_list _ = + let to_list s = P2p_acl.IpSet.fold (fun k _v acc -> k::acc) s [] in + let list_eq = List.for_all2 (fun x y -> Ipaddr.V6.Prefix.compare x y = 0) in + let assert_equal_set ~msg a b = + let a = List.sort compare a in + let b = List.sort compare (to_list b) in + assert_equal ~prn:print_list ~eq:list_eq ~msg a b in + + let set = P2p_acl.IpSet.add_prefix (p "::/0") timenow P2p_acl.IpSet.empty in + assert_equal ~eq:list_eq ~prn:print_list ~msg:__LOC__ [p "::/0"] (to_list set) ; + + let set = of_list [p "::/1" ; p "8000::/1"] in + assert_equal ~eq:list_eq ~prn:print_list ~msg:__LOC__ [p "8000::/1"; p "::/1" ] (to_list set) ; + + let setlist = [p "1234:5678::/32"] in + let set = of_list setlist in + assert_equal_set ~msg:__LOC__ setlist set ; + + let setlist = [p "e000::/4" ; p "a000::/4" ; + p "ffff::/16" ; + p "1234:5678::/32" ; + ] in + let set = of_list setlist in + assert_equal_set ~msg:__LOC__ setlist set + +let () = + Alcotest.run ~argv:[|""|] "tezos-p2p" [ + "p2p.ipv6set", [ + "empty", `Quick, test_empty ; + "inclusion", `Quick, test_inclusion ; + "contiguous", `Quick, test_contiguous ; + "test_fold", `Quick, test_fold ; + "to_list", `Quick, test_to_list ; + ] + ] diff --git a/src/lib_p2p/test/test_p2p_peerset.ml b/src/lib_p2p/test/test_p2p_peerset.ml new file mode 100644 index 000000000..606d0f692 --- /dev/null +++ b/src/lib_p2p/test/test_p2p_peerset.ml @@ -0,0 +1,57 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make (struct let name = "test-p2p-banned_peers" end) + +let assert_equal_bool ~msg a b = + if a <> b then Alcotest.fail msg + +let a = fun s -> P2p_peer.Id.hash_string [s] + +let test_empty _ = + let peers = List.map a [ "foo"; "bar"; "baz" ; ] in + let empty = P2p_acl.PeerRing.create 10 in + List.iter (fun peer -> + assert_equal_bool ~msg:__LOC__ false (P2p_acl.PeerRing.mem empty peer) + ) peers + +let test_add _ = + let peers = List.map a [ "foo"; "bar"; "baz" ; ] in + let set = P2p_acl.PeerRing.create 10 in + List.iter (fun peer -> P2p_acl.PeerRing.add set peer) peers; + List.iter (fun peer -> + assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set peer) + ) peers + +let test_remove _ = + let peers = List.map a [ "foo"; "bar"; "baz" ; ] in + let set = P2p_acl.PeerRing.create 10 in + List.iter (fun peer -> P2p_acl.PeerRing.add set peer) peers; + assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set (a "bar")); + P2p_acl.PeerRing.remove set (a "bar"); + assert_equal_bool ~msg:__LOC__ false (P2p_acl.PeerRing.mem set (a "bar")) + +let test_overflow _ = + let peers = List.map a [ "foo"; "bar"; "baz" ; ] in + let set = P2p_acl.PeerRing.create 3 in + List.iter (fun peer -> P2p_acl.PeerRing.add set peer) peers; + assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set (a "baz")); + P2p_acl.PeerRing.add set (a "zor"); + assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set (a "zor")); + assert_equal_bool ~msg:__LOC__ false (P2p_acl.PeerRing.mem set (a "baz")) + +let () = + Alcotest.run ~argv:[|""|] "tezos-p2p" [ + "p2p.peerset", [ + "empty", `Quick, test_empty ; + "add", `Quick, test_add; + "overflow", `Quick, test_overflow; + "remove", `Quick, test_remove; + ] + ] diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index 8fb822fac..c729ae22e 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -71,6 +71,7 @@ let detach_node f points n = max_incoming_connections = nb_points ; connection_timeout = 10. ; authentication_timeout = 2. ; + greylist_timeout = 2. ; incoming_app_message_queue_size = None ; incoming_message_queue_size = None ; outgoing_message_queue_size = None ; diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index ec0a696f4..bef0b2fcf 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -484,8 +484,10 @@ module P2p_reader = struct (State.Block.known_invalid chain_db.chain_state) (Block_header.hash head :: hist) >>= fun known_invalid -> if not known_invalid then - chain_db.callback.notify_branch state.gid locator ; - (* TODO Kickban *) + chain_db.callback.notify_branch state.gid locator + else + (* Kickban *) + P2p.temp_ban_peer global_db.p2p state.gid; Lwt.return_unit | Deactivate chain_id -> @@ -508,8 +510,10 @@ module P2p_reader = struct let head = Block_header.hash header in State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid -> if not known_invalid then - chain_db.callback.notify_head state.gid header mempool ; - (* TODO Kickban *) + chain_db.callback.notify_head state.gid header mempool + else + (* Kickban *) + P2p.temp_ban_peer global_db.p2p state.gid ; Lwt.return_unit | Get_block_headers hashes -> @@ -764,6 +768,9 @@ let get_chain { active_chains } chain_id = try Some (Chain_id.Table.find active_chains chain_id) with Not_found -> None +let temp_ban { global_db = { p2p } } peer_id = + Lwt.return (P2p.temp_ban_peer p2p peer_id) + let disconnect { global_db = { p2p } } peer_id = match P2p.find_connection p2p peer_id with | None -> Lwt.return_unit diff --git a/src/lib_shell/distributed_db.mli b/src/lib_shell/distributed_db.mli index 8f6ad59f9..129bb94bd 100644 --- a/src/lib_shell/distributed_db.mli +++ b/src/lib_shell/distributed_db.mli @@ -54,6 +54,9 @@ val set_callback: chain_db -> callback -> unit (** Kick a given peer. *) val disconnect: chain_db -> P2p_peer.Id.t -> unit Lwt.t +(** Temporarily ban of a given peer. *) +val temp_ban: chain_db -> P2p_peer.Id.t -> unit Lwt.t + (** Various accessors. *) val chain_state: chain_db -> State.Chain.t val db: chain_db -> db diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index 98dd4742f..d43bdbfed 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -251,7 +251,7 @@ let on_error w r st errs = ((( Validation_errors.Unknown_ancestor | Validation_errors.Invalid_locator _ | Block_validator_errors.Invalid_block _ ) :: _) as errors ) -> - (* TODO ban the peer_id... *) + Distributed_db.temp_ban pv.parameters.chain_db pv.peer_id >>= fun () -> debug w "Terminating the validation worker for peer %a (kickban)." P2p_peer.Id.pp_short pv.peer_id ; @@ -267,7 +267,7 @@ let on_error w r st errs = protocol >>= function | Ok _ -> return () | Error _ -> - (* TODO penality... *) + (* TODO: punish *) debug w "Terminating the validation worker for peer %a \ (missing protocol %a)." diff --git a/src/lib_shell_services/p2p_errors.ml b/src/lib_shell_services/p2p_errors.ml index 3472b9f18..1f6995682 100644 --- a/src/lib_shell_services/p2p_errors.ml +++ b/src/lib_shell_services/p2p_errors.ml @@ -145,6 +145,8 @@ type error += Connection_refused type error += Rejected of P2p_peer.Id.t type error += Too_many_connections type error += Closed_network +type error += Point_banned of P2p_point.Id.t +type error += Peer_banned of P2p_peer.Id.t let () = (* Pending connection *) @@ -207,4 +209,30 @@ let () = ~pp:(fun ppf () -> Format.fprintf ppf "Network is closed.") Data_encoding.empty (function Closed_network -> Some () | _ -> None) - (fun () -> Closed_network) + (fun () -> Closed_network) ; + (* Point Banned *) + register_error_kind + `Permanent + ~id:"node.p2p_pool.point_banned" + ~title:"Point Banned" + ~description:"The addr you tried to connect is banned." + ~pp:(fun ppf point -> + Format.fprintf ppf + "The addr you tried to connect (%a) is banned." + P2p_addr.pp (fst point)) + Data_encoding.(obj1 (req "point" P2p_point.Id.encoding)) + (function Point_banned point -> Some point | _ -> None) + (fun point -> Point_banned point) ; + (* Peer Banned *) + register_error_kind + `Permanent + ~id:"node.p2p_pool.peer_banned" + ~title:"Peer Banned" + ~description:"The peer identity you tried to connect is banned." + ~pp:(fun ppf peer_id -> + Format.fprintf ppf + "The peer identity you tried to connect (%a) is banned." + P2p_peer.Id.pp peer_id) + Data_encoding.(obj1 (req "peer" P2p_peer.Id.encoding)) + (function Peer_banned peer_id -> Some peer_id | _ -> None) + (fun peer_id -> Peer_banned peer_id) diff --git a/src/lib_shell_services/p2p_services.ml b/src/lib_shell_services/p2p_services.ml index ba4d27f5e..7064770fd 100644 --- a/src/lib_shell_services/p2p_services.ml +++ b/src/lib_shell_services/p2p_services.ml @@ -15,7 +15,7 @@ module S = struct ~query: RPC_query.empty ~input: Data_encoding.empty ~output: (Data_encoding.list P2p_version.encoding) - RPC_path.(root / "p2p" / "versions") + RPC_path.(root / "network" / "versions") let stat = RPC_service.post_service @@ -23,7 +23,7 @@ module S = struct ~query: RPC_query.empty ~input: Data_encoding.empty ~output: P2p_stat.encoding - RPC_path.(root / "p2p" / "stat") + RPC_path.(root / "network" / "stat") let events = RPC_service.post_service @@ -31,7 +31,7 @@ module S = struct ~query: RPC_query.empty ~input: Data_encoding.empty ~output: P2p_connection.Pool_event.encoding - RPC_path.(root / "p2p" / "log") + RPC_path.(root / "network" / "log") let connect = RPC_service.post_service @@ -39,7 +39,7 @@ module S = struct ~query: RPC_query.empty ~input: Data_encoding.(obj1 (dft "timeout" float 5.)) ~output: Data_encoding.empty - RPC_path.(root / "p2p" / "connect" /: P2p_point.Id.rpc_arg) + RPC_path.(root / "network" / "connect" /: P2p_point.Id.rpc_arg) end @@ -62,7 +62,7 @@ module Connections = struct ~query: RPC_query.empty ~input: Data_encoding.empty ~output: (Data_encoding.list P2p_connection.Info.encoding) - RPC_path.(root / "p2p" / "connections") + RPC_path.(root / "network" / "connections") let info = RPC_service.post_service @@ -70,7 +70,7 @@ module Connections = struct ~input: Data_encoding.empty ~output: P2p_connection.Info.encoding ~description:"Details about the current P2P connection to the given peer." - RPC_path.(root / "p2p" / "connections" /: P2p_peer.Id.rpc_arg) + RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg) let kick = RPC_service.post_service @@ -78,7 +78,7 @@ module Connections = struct ~input: Data_encoding.(obj1 (req "wait" bool)) ~output: Data_encoding.empty ~description:"Forced close of the current P2P connection to the given peer." - RPC_path.(root / "p2p" / "connections" /: P2p_peer.Id.rpc_arg / "kick") + RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg / "kick") end @@ -98,7 +98,7 @@ module Points = struct ~input: Data_encoding.empty ~output: P2p_point.Info.encoding ~description: "Details about a given `IP:addr`." - RPC_path.(root / "p2p" / "points" /: P2p_point.Id.rpc_arg) + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg) let events = RPC_service.post_service @@ -107,7 +107,7 @@ module Points = struct ~output: (Data_encoding.list P2p_point.Pool_event.encoding) ~description: "Monitor network events related to an `IP:addr`." - RPC_path.(root / "p2p" / "points" /: P2p_point.Id.rpc_arg / "log") + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "log") let list = let filter = @@ -121,8 +121,42 @@ module Points = struct P2p_point.Id.encoding P2p_point.Info.encoding)) ~description:"List the pool of known `IP:port` \ - used for establishing P2P connections ." - RPC_path.(root / "networks" / "point") + used for establishing P2P connections." + RPC_path.(root / "network" / "points") + + let forget = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Remove the given address from the whitelist/blacklist." + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "forget" ) + + let ban = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Ban the given address permanently." + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "ban" ) + + let trust = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Trust a given address permanently. \ + Connections from this address can still be closed \ + on authentication if the peer is banned or blocked." + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "trust" ) + + let is_banned = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.bool + ~description:"Check is a given address is blocked, permanently or temporarily." + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "isbanned" ) end @@ -131,6 +165,10 @@ module Points = struct let events ctxt point = make_streamed_call S.events ctxt ((), point) () true let list ?(filter = []) ctxt = make_call S.list ctxt () () filter + let forget ctxt peer_id = make_call1 S.forget ctxt peer_id () () + let ban ctxt peer_id = make_call1 S.ban ctxt peer_id () () + let trust ctxt peer_id = make_call1 S.trust ctxt peer_id () () + let is_banned ctxt peer_id = make_call1 S.is_banned ctxt peer_id () () end @@ -144,7 +182,7 @@ module Peers = struct ~input: Data_encoding.empty ~output: P2p_peer.Info.encoding ~description:"Details about a given peer." - RPC_path.(root / "p2p" / "peers" /: P2p_peer.Id.rpc_arg) + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg) let events = RPC_service.post_service @@ -153,7 +191,7 @@ module Peers = struct ~output: (Data_encoding.list P2p_peer.Pool_event.encoding) ~description:"Monitor network events related to a given peer." - RPC_path.(root / "p2p" / "peers" /: P2p_peer.Id.rpc_arg / "log") + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "log") let list = let filter = @@ -167,7 +205,40 @@ module Peers = struct P2p_peer.Id.encoding P2p_peer.Info.encoding)) ~description:"List the peers the node ever met." - RPC_path.(root / "p2p" / "peers") + RPC_path.(root / "network" / "peers") + + let forget = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Remove the given peer from the whitelist/blacklist." + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "forget" ) + + let ban = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Ban the given peer permanently." + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "ban" ) + + let trust = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Trust a given peer permanently: \ + the peer cannot be blocked (but its machine's IP still can)." + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "trust" ) + + let is_banned = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.bool + ~description:"Check is a given peer is blocked, permanently or temporarily." + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "isbanned" ) end @@ -175,5 +246,26 @@ module Peers = struct let events ctxt point = make_streamed_call S.events ctxt ((), point) () true let list ?(filter = []) ctxt = make_call S.list ctxt () () filter + let forget ctxt point_id = make_call1 S.forget ctxt point_id () () + let ban ctxt point_id = make_call1 S.ban ctxt point_id () () + let trust ctxt point_id = make_call1 S.trust ctxt point_id () () + let is_banned ctxt point_id = make_call1 S.is_banned ctxt point_id () () + +end + +module Greylist = struct + + module S = struct + + let clear = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: Data_encoding.empty + ~description:"Clear all greylists tables." + RPC_path.(root / "network" / "greylist" / "clear" ) + end + + let clear ctxt = make_call S.clear ctxt () () end diff --git a/src/lib_shell_services/p2p_services.mli b/src/lib_shell_services/p2p_services.mli index dad4faac9..ed1cd3eea 100644 --- a/src/lib_shell_services/p2p_services.mli +++ b/src/lib_shell_services/p2p_services.mli @@ -87,6 +87,14 @@ module Points : sig P2p_point.Id.t -> (P2p_point.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t + val forget : #simple -> P2p_point.Id.t -> unit tzresult Lwt.t + + val ban: #simple -> P2p_point.Id.t -> unit tzresult Lwt.t + + val trust: #simple -> P2p_point.Id.t -> unit tzresult Lwt.t + + val is_banned: #simple -> P2p_point.Id.t -> bool tzresult Lwt.t + module S : sig val list : @@ -104,6 +112,27 @@ module Points : sig unit * P2p_point.Id.t, unit, bool, P2p_point.Pool_event.t list) RPC_service.t + val forget : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, unit, + unit) RPC_service.t + + val ban : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, unit, + unit) RPC_service.t + + val trust : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, unit, + unit) RPC_service.t + + val is_banned : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, unit, + bool) RPC_service.t + + end end @@ -122,6 +151,14 @@ module Peers : sig #streamed -> P2p_peer.Id.t -> (P2p_peer.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t + val forget : #simple -> P2p_peer.Id.t -> unit tzresult Lwt.t + + val ban: #simple -> P2p_peer.Id.t -> unit tzresult Lwt.t + + val trust: #simple -> P2p_peer.Id.t -> unit tzresult Lwt.t + + val is_banned: #simple -> P2p_peer.Id.t -> bool tzresult Lwt.t + module S : sig val list : @@ -139,6 +176,41 @@ module Peers : sig unit * P2p_peer.Id.t, unit, bool, P2p_peer.Pool_event.t list) RPC_service.t + val forget : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, unit, + unit) RPC_service.t + + val ban : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, unit, + unit) RPC_service.t + + val trust : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, unit, + unit) RPC_service.t + + val is_banned : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, unit, + bool) RPC_service.t + + end + +end + +module Greylist : sig + + val clear: #simple -> unit -> unit tzresult Lwt.t + + module S : sig + + val clear : + ([ `POST ], unit, + unit, unit, unit, + unit) RPC_service.t + end end diff --git a/src/lib_stdlib/ring.ml b/src/lib_stdlib/ring.ml index 82e824796..cdfe09ba4 100644 --- a/src/lib_stdlib/ring.ml +++ b/src/lib_stdlib/ring.ml @@ -7,69 +7,136 @@ (* *) (**************************************************************************) -type 'a raw = - | Empty of int - | Inited of { - data : 'a array ; - mutable pos : int ; - } +module Ring = struct + type 'a raw = + | Empty of int + | Inited of { + data : 'a array ; + mutable pos : int ; + } -type 'a t = 'a raw ref + type 'a t = 'a raw ref -let create size = ref (Empty size) + let create size = ref (Empty size) -let add r v = - match !r with - | Empty size -> - r := Inited { data = Array.make size v ; pos = 0 } - | Inited s -> - s.pos <- - if s.pos = 2 * Array.length s.data - 1 then - Array.length s.data - else - s.pos + 1 ; - s.data.(s.pos mod Array.length s.data) <- v - -let add_and_return_erased r v = - let replaced = match !r with - | Empty _ -> None + let add r v = + match !r with + | Empty size -> + r := Inited { data = Array.make size v ; pos = 0 } | Inited s -> - if s.pos >= Array.length s.data - 1 then - Some (s.data.(s.pos mod Array.length s.data)) - else - None in - add r v ; replaced + s.pos <- + if s.pos = 2 * Array.length s.data - 1 then + Array.length s.data + else + s.pos + 1 ; + s.data.(s.pos mod Array.length s.data) <- v -let clear r = - match !r with - | Empty _ -> () - | Inited { data ; _ } -> - r := Empty (Array.length data) + let add_and_return_erased r v = + let replaced = match !r with + | Empty _ -> None + | Inited s -> + if s.pos >= Array.length s.data - 1 then + Some (s.data.(s.pos mod Array.length s.data)) + else + None in + add r v ; replaced -let add_list r l = List.iter (add r) l + let clear r = + match !r with + | Empty _ -> () + | Inited { data ; _ } -> + r := Empty (Array.length data) -let last r = - match !r with - | Empty _ -> None - | Inited { data ; pos } -> Some data.(pos mod Array.length data) -let fold r ~init ~f = - match !r with - | Empty _ -> init - | Inited { data ; pos } -> - let size = Array.length data in - let acc = ref init in - for i = 0 to min pos (size - 1) do - acc := f !acc data.((pos - i) mod size) - done ; - !acc + let add_list r l = List.iter (add r) l -let elements t = - fold t ~init:[] ~f:(fun acc elt -> elt :: acc) + let last r = + match !r with + | Empty _ -> None + | Inited { data ; pos } -> Some data.(pos mod Array.length data) -exception Empty + let fold r ~init ~f = + match !r with + | Empty _ -> init + | Inited { data ; pos } -> + let size = Array.length data in + let acc = ref init in + for i = 0 to min pos (size - 1) do + acc := f !acc data.((pos - i) mod size) + done ; + !acc -let last_exn r = - match last r with - | None -> raise Empty - | Some d -> d + let elements t = + fold t ~init:[] ~f:(fun acc elt -> elt :: acc) + + exception Empty + + let last_exn r = + match last r with + | None -> raise Empty + | Some d -> d +end + +include Ring + +(** Ring Buffer Table *) +module type TABLE = sig + type t + type v + val create : int -> t + val clear : t -> unit + val add : t -> v -> unit + val mem : t -> v -> bool + val remove : t -> v -> unit + val elements : t -> v list +end + + +(* fixed size set of Peers id. If the set exceed the maximal allowed capacity, the + element that was added first is removed when a new one is added *) +module MakeTable (V: Hashtbl.HashedType) = struct + module Table = Hashtbl.Make(V) + + type raw = { + size : int ; + mutable capacity : int ; + ring : V.t Ring.t ; + table : unit Table.t ; + } + + type t = raw ref + type v = V.t + + let create size = ref { + size; + capacity = 0; + ring = Ring.create size; + table = Table.create size } + + let add ({contents = t } as tt) v = + assert (t.capacity <= t.size); + if t.capacity = t.size then begin + try Table.remove t.table (Ring.last_exn t.ring) with Ring.Empty -> assert false + end; + Ring.add t.ring v; + Table.replace t.table v (); + let capacity = if t.capacity = t.size then t.capacity else t.capacity + 1 in + tt := { ring = t.ring ; table = t.table; size = t.size ; capacity = capacity } + + let mem t v = Table.mem !t.table v + + let remove ({contents = t } as tt) v = + Table.remove t.table v; + let capacity = if t.capacity = 0 then t.capacity else t.capacity - 1 in + tt := { ring = t.ring ; table = t.table; size = t.size ; capacity = capacity } + + let clear ({contents = t } as tt) = + tt := { t with + capacity = 0; + ring = Ring.create t.size; + table = Table.create t.size + } + + let elements t = Ring.elements !t.ring + +end diff --git a/src/lib_stdlib/ring.mli b/src/lib_stdlib/ring.mli index 0d21aeb3f..5b9c0f037 100644 --- a/src/lib_stdlib/ring.mli +++ b/src/lib_stdlib/ring.mli @@ -13,6 +13,8 @@ a fixed number of values of a same type. Values are never removed, once the limit is reached, adding a value replaces the oldest one in the ring buffer. *) +exception Empty + type 'a t (** Allocates a ring buffer for a given number of values. *) @@ -33,8 +35,6 @@ val clear : 'a t -> unit (** Retrieves the most recent value, or [None] when empty. *) val last : 'a t -> 'a option -exception Empty - (** Same as {!last}, but raises {!Empty} when empty. *) val last_exn : 'a t -> 'a @@ -43,3 +43,31 @@ val fold : 'a t -> init:'b -> f:('b -> 'a -> 'b) -> 'b (** Retrieves the elements as a list, oldest first.. *) val elements : 'a t -> 'a list + +(** Ring Buffer Table *) +module type TABLE = sig + type t + type v + + (** [create size] inizialize an empty ring *) + val create : int -> t + + (** [retest t] remore all bindings from the current ring *) + val clear : t -> unit + + (** [add t v] add a value to the ring. If the ring already contains size elements, + the first element is removed and [v] is added. *) + val add : t -> v -> unit + + (** [mem t v] check if v is in the ring. O(1) *) + val mem : t -> v -> bool + + (** [remove t v] remove one element from the table *) + val remove : t -> v -> unit + + (** [elements t] return the list of elements currently in the ring *) + val elements : t -> v list + +end + +module MakeTable (V: Hashtbl.HashedType) : TABLE with type v = V.t