p2p,node,client: Add Greylists
- add admin commands to ban and unban ips and peers - add greylist_timeout option to configuration file (node) - Add greylist modules + RPC
This commit is contained in:
@ -161,6 +161,21 @@ test:proto_alpha:
- jbuilder build @src/proto_alpha/lib_protocol/runtest
- jbuilder build @src/proto_alpha/lib_protocol/runtest
<<: *test_definition
- jbuilder build @src/lib_p2p/runtest_p2p_peerset
<<: *test_definition
- jbuilder build @src/lib_p2p/runtest_p2p_ipv6set
<<: *test_definition
- jbuilder build @src/lib_p2p/runtest_p2p_banned_peers
# test:client_alpha:transaction:
# test:client_alpha:transaction:
# <<: *test_definition
# <<: *test_definition
# script:
# script:
@ -28,12 +28,15 @@ General operation
I/O Scheduling
I/O Scheduling
The P2P layer uses I/O scheduling in order to be able to control its
The P2P layer uses a scheduling mechanism in order to be able to control its
bandwidth usage as well as implementing different policies
bandwidth usage as well as implementing different policies (e.g. read/write
(e.g. read/write quotas) to different peers. For now, each peer is
quotas) to different peers. For now, each peer is granted a fair share of the
granted a fair share of the global allocated bandwidth, but it is
global allocated bandwidth, but it is planned for the individual allocated
planned for the individual allocated bandwidth to each peer to be a
bandwidth to each peer to be a function of the peer's score. Each connection is
function of the peer's score.
associated tp a read/write queues where data is copied at a rate of equivalent
to max_download_speed / num_connections (resp. max_upload_speed /
@ -83,6 +86,19 @@ layer. It basically runs the ``accept(2)`` syscall and call
that it is made aware of an incoming connection. From there, the pool
that it is made aware of an incoming connection. From there, the pool
will decide how this new connection must be handled.
will decide how this new connection must be handled.
BlackLists, WhiteLists, GreyLists
The welcome worker takes care of filtering all incoming connections using two
static lists of address provided by the admin (either via the ``tezos-admin``
client or directly in the configuration file) and an automatic (grey)list
handled automatically by the p2p layer. The node admin can block or whitelist
individual ip addresses, while the p2p layer can temporarly ban ip addresses and
peer's identities that misbihaved. The delay to remove an ip address from
the greylist table is defined by the varaible ``greylist_timeout``, while
identities are greylisted in a fixed size ring and periodically removed. The
node admin can also flush the greylist tables using the ``tezos-admin`` client.
Maintenance worker
Maintenance worker
@ -111,4 +127,5 @@ Given these bounds, the maintenance worker:
peers until it reaches at least ``min_target`` connections (and never
peers until it reaches at least ``min_target`` connections (and never
more than ``max_target`` connections).
more than ``max_target`` connections).
The maintenance worker is also in charge to periodically run the greylists
GC functions to unban ip addresses from the greylist.
@ -42,7 +42,6 @@ let get_commands_for_version ctxt block protocol =
let select_commands ctxt { block ; protocol } =
let select_commands ctxt { block ; protocol } =
get_commands_for_version ctxt block protocol >>|? fun (_, commands_for_version) ->
get_commands_for_version ctxt block protocol >>|? fun (_, commands_for_version) ->
Client_rpc_commands.commands @
Client_rpc_commands.commands @
Client_p2p_commands.commands () @
Client_keys_commands.commands () @
Client_keys_commands.commands () @
Client_helpers_commands.commands () @
Client_helpers_commands.commands () @
@ -22,7 +22,7 @@ for client in "${client_instances[@]}"; do
echo "### $client p2p stat"
echo "### $client p2p stat"
$client bootstrapped
$client bootstrapped
$client p2p stat
$admin_client network stat
@ -62,6 +62,7 @@ and shell = {
let default_p2p_limits : P2p.limits = {
let default_p2p_limits : P2p.limits = {
connection_timeout = 10. ;
connection_timeout = 10. ;
authentication_timeout = 5. ;
authentication_timeout = 5. ;
greylist_timeout = 86400. ; (* one day *)
min_connections = 10 ;
min_connections = 10 ;
expected_connections = 50 ;
expected_connections = 50 ;
max_connections = 100 ;
max_connections = 100 ;
@ -159,7 +160,7 @@ let default_config = {
let limit : P2p.limits Data_encoding.t =
let limit : P2p.limits Data_encoding.t =
let open Data_encoding in
let open Data_encoding in
(fun { P2p.connection_timeout ; authentication_timeout ;
(fun { P2p.connection_timeout ; authentication_timeout ; greylist_timeout ;
min_connections ; expected_connections ; max_connections ;
min_connections ; expected_connections ; max_connections ;
backlog ; max_incoming_connections ;
backlog ; max_incoming_connections ;
max_download_speed ; max_upload_speed ;
max_download_speed ; max_upload_speed ;
@ -170,8 +171,8 @@ let limit : P2p.limits Data_encoding.t =
max_known_points ; max_known_peer_ids ;
max_known_points ; max_known_peer_ids ;
swap_linger ; binary_chunks_size
swap_linger ; binary_chunks_size
} ->
} ->
(((( connection_timeout,
(((( connection_timeout, authentication_timeout,
authentication_timeout, min_connections, expected_connections,
min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed, swap_linger),
max_download_speed, max_upload_speed, swap_linger),
( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size,
( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size,
@ -179,9 +180,9 @@ let limit : P2p.limits Data_encoding.t =
incoming_message_queue_size, outgoing_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_peer_ids_history_size,
known_points_history_size, known_peer_ids_history_size,
( max_known_peer_ids, greylist_timeout))))
(fun (((( connection_timeout,
(fun (((( connection_timeout, authentication_timeout,
authentication_timeout, min_connections, expected_connections,
min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed, swap_linger),
max_download_speed, max_upload_speed, swap_linger),
( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size,
( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size,
@ -189,8 +190,9 @@ let limit : P2p.limits Data_encoding.t =
incoming_message_queue_size, outgoing_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_peer_ids_history_size,
known_points_history_size, known_peer_ids_history_size,
max_known_peer_ids)) ->
( max_known_peer_ids, greylist_timeout))) ->
{ connection_timeout ; authentication_timeout ; min_connections ; expected_connections ;
{ connection_timeout ; authentication_timeout ; greylist_timeout ;
min_connections ; expected_connections ;
max_connections ; backlog ; max_incoming_connections ;
max_connections ; backlog ; max_incoming_connections ;
max_download_speed ; max_upload_speed ;
max_download_speed ; max_upload_speed ;
read_buffer_size ; read_queue_size ; write_queue_size ;
read_buffer_size ; read_queue_size ; write_queue_size ;
@ -271,8 +273,14 @@ let limit : P2p.limits Data_encoding.t =
(opt "max_known_points" (tup2 uint16 uint16))
(opt "max_known_points" (tup2 uint16 uint16))
(opt "max_known_peer_ids" (tup2 uint16 uint16))))
(opt "max_known_peer_ids" (tup2 uint16 uint16))
(dft "greylist-timeout"
~description: "GC delay for the greylists tables, in seconds."
float) default_p2p_limits.greylist_timeout)
let p2p =
let p2p =
let open Data_encoding in
let open Data_encoding in
@ -26,3 +26,23 @@ let encoding =
type port = int
type port = int
let pp ppf addr =
match Ipaddr.v4_of_v6 addr with
| Some addr ->
Format.fprintf ppf "%a" Ipaddr.V4.pp_hum addr
| None ->
Format.fprintf ppf "[%a]" Ipaddr.V6.pp_hum addr
let of_string_exn str =
match Ipaddr.of_string_exn str with
| V4 addr -> Ipaddr.v6_of_v4 addr
| V6 addr -> addr
let of_string str =
try Ok (of_string_exn str) with
| Invalid_argument s -> Error s
| Failure s -> Error s
| _ -> Error "P2p_addr.of_string"
let to_string saddr = Format.asprintf "%a" pp saddr
@ -11,3 +11,10 @@ type t = Ipaddr.V6.t
type port = int
type port = int
val encoding : t Data_encoding.t
val encoding : t Data_encoding.t
val pp : Format.formatter -> t -> unit
val of_string : string -> (t, string) result
val of_string_exn : string -> t
val to_string : t -> string
@ -10,6 +10,8 @@
type t
type t
include Compare.S with type t := t
include Compare.S with type t := t
val hash : t -> int
val min_value : t
val min_value : t
val epoch : t
val epoch : t
val max_value : t
val max_value : t
@ -11,8 +11,24 @@ let group =
{ Clic.name = "p2p" ;
{ Clic.name = "p2p" ;
title = "Commands for monitoring and controlling p2p-layer state" }
title = "Commands for monitoring and controlling p2p-layer state" }
let commands () = [
let port_arg () =
let open Clic in
let open Clic in
~doc:"peer-to-peer port of the node"
~default: "9732"
(fun _ x -> try
return (int_of_string x)
with Failure _ ->
failwith "Invalid peer-to-peer port"))
let commands () =
let open Clic in
let addr_parameter = parameter (fun _ x -> return (P2p_addr.of_string_exn x)) in
command ~group ~desc: "show global network status"
command ~group ~desc: "show global network status"
(prefixes ["p2p" ; "stat"] stop) begin fun () (cctxt : #Client_context.full) ->
(prefixes ["p2p" ; "stat"] stop) begin fun () (cctxt : #Client_context.full) ->
@ -65,5 +81,109 @@ let commands () = [
(if pi.trusted then "★" else " ")
(if pi.trusted then "★" else " ")
end points >>= fun () ->
end points >>= fun () ->
return ()
return ()
end ;
command ~group ~desc: "Connect to a new point."
(args1 (port_arg ()))
(prefixes [ "connect" ; "address" ]
@@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter
@@ stop)
(fun port address (cctxt : #Client_context.full) ->
P2p_services.connect cctxt ~timeout:10. (address,port) >>=? fun () ->
return ()
command ~group ~desc: "Remove an IP address from the blacklist and whitelist."
(prefixes [ "forget" ; "address" ]
@@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter
@@ stop)
(fun () address (cctxt : #Client_context.full) ->
P2p_services.Points.forget cctxt (address,0) >>=? fun () ->
return ()
command ~group ~desc: "Add a IP address to the blacklist."
(prefixes [ "ban" ; "address" ]
@@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter
@@ stop)
(fun () address (cctxt : #Client_context.full) ->
P2p_services.Points.ban cctxt (address,0) >>=? fun () ->
return ()
command ~group ~desc: "Add a IP address to the whitelist."
(prefixes [ "trust" ; "address" ]
@@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter
@@ stop)
(fun () address (cctxt : #Client_context.full) ->
P2p_services.Points.trust cctxt (address,0) >>=? fun () ->
return ()
command ~group ~desc: "Check if an IP address is banned."
(prefixes [ "is" ; "address" ; "banned" ]
@@ param ~name:"address" ~desc:"IPv4 or IPV6 address" addr_parameter
@@ stop)
(fun () address (cctxt : #Client_context.full) ->
P2p_services.Points.is_banned cctxt (address,0) >>=? fun answer ->
"The given ip address is %s"
(if answer then "banned" else "not banned") >>= fun () ->
return ()
command ~group ~desc: "Remove a peer ID from the blacklist and whitelist."
(prefixes [ "forget" ; "peer" ]
@@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity"
@@ stop)
(fun () peer (cctxt : #Client_context.full) ->
P2p_services.Peers.forget cctxt peer >>=? fun () ->
return ()
command ~group ~desc: "Add a peer ID to the blacklist."
(prefixes [ "ban" ; "peer" ]
@@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity"
@@ stop)
(fun () peer (cctxt : #Client_context.full) ->
P2p_services.Peers.ban cctxt peer >>=? fun () ->
return ()
command ~group ~desc: "Add a peer ID to the whitelist."
(prefixes [ "trust" ; "peer" ]
@@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity"
@@ stop)
(fun () peer (cctxt : #Client_context.full) ->
P2p_services.Peers.trust cctxt peer >>=? fun () ->
return ()
command ~group ~desc: "Check if a peer ID is banned."
(prefixes [ "is" ; "peer" ; "banned" ]
@@ P2p_peer.Id.param ~name:"peer" ~desc:"peer network identity"
@@ stop)
(fun () peer (cctxt : #Client_context.full) ->
P2p_services.Peers.is_banned cctxt peer >>=? fun answer ->
"The given peer ID is %s"
(if answer then "banned" else "not banned") >>= fun () ->
return ()
command ~group ~desc: "Clear all greylist tables."
(prefixes [ "clear" ; "greylists" ] @@ stop)
(fun () (cctxt : #Client_context.full) ->
P2p_services.Greylist.clear cctxt () >>=? fun () ->
return ()
@ -43,6 +43,7 @@ type limits = {
connection_timeout : float ;
connection_timeout : float ;
authentication_timeout : float ;
authentication_timeout : float ;
greylist_timeout : float ;
min_connections : int ;
min_connections : int ;
expected_connections : int ;
expected_connections : int ;
@ -97,6 +98,7 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched =
max_incoming_connections = limits.max_incoming_connections ;
max_incoming_connections = limits.max_incoming_connections ;
connection_timeout = limits.connection_timeout ;
connection_timeout = limits.connection_timeout ;
authentication_timeout = limits.authentication_timeout ;
authentication_timeout = limits.authentication_timeout ;
greylist_timeout = limits.greylist_timeout ;
incoming_app_message_queue_size = limits.incoming_app_message_queue_size ;
incoming_app_message_queue_size = limits.incoming_app_message_queue_size ;
incoming_message_queue_size = limits.incoming_message_queue_size ;
incoming_message_queue_size = limits.incoming_message_queue_size ;
outgoing_message_queue_size = limits.outgoing_message_queue_size ;
outgoing_message_queue_size = limits.outgoing_message_queue_size ;
@ -132,7 +134,7 @@ let create_maintenance_worker limits pool =
P2p_maintenance.run bounds pool
P2p_maintenance.run bounds ~greylist_timeout:limits.greylist_timeout pool
let may_create_welcome_worker config limits pool =
let may_create_welcome_worker config limits pool =
match config.listening_port with
match config.listening_port with
@ -444,6 +446,16 @@ let fold_connections net = net.fold_connections
let iter_connections net = net.iter_connections
let iter_connections net = net.iter_connections
let on_new_connection net = net.on_new_connection
let on_new_connection net = net.on_new_connection
let temp_ban_peer net peer_id =
match net.pool with
|None -> ()
|Some pool -> P2p_pool.temp_ban_peer pool peer_id
let temp_ban_addr net addr =
match net.pool with
|None -> ()
|Some pool -> P2p_pool.temp_ban_addr pool addr
module Raw = struct
module Raw = struct
type 'a t = 'a P2p_pool.Message.t =
type 'a t = 'a P2p_pool.Message.t =
| Bootstrap
| Bootstrap
@ -536,6 +548,9 @@ let build_rpc_directory net =
match net.pool with
match net.pool with
| None -> failwith "The node has disable the P2P layer."
| None -> failwith "The node has disable the P2P layer."
| Some pool ->
| Some pool ->
(not(P2p_pool.Points.is_banned pool point))
(P2p_errors.Point_banned point) >>=? fun () ->
ignore (P2p_pool.connect ~timeout pool point : _ tzresult Lwt.t) ;
ignore (P2p_pool.connect ~timeout pool point : _ tzresult Lwt.t) ;
return ()
return ()
end in
end in
@ -638,6 +653,37 @@ let build_rpc_directory net =
RPC_answer.return_stream { next ; shutdown }
RPC_answer.return_stream { next ; shutdown }
end in
end in
let dir =
RPC_directory.gen_register1 dir P2p_services.Peers.S.ban
begin fun peer_id () () ->
match net.pool with
| None -> RPC_answer.not_found
| Some pool ->
P2p_pool.Peers.unset_trusted pool peer_id;
P2p_pool.Peers.ban pool peer_id ;
RPC_answer.return ()
end in
let dir =
RPC_directory.gen_register1 dir P2p_services.Peers.S.trust
begin fun peer_id () () ->
match net.pool with
| None -> RPC_answer.not_found
| Some pool ->
P2p_pool.Peers.set_trusted pool peer_id ;
RPC_answer.return ()
end in
let dir =
RPC_directory.register1 dir P2p_services.Peers.S.is_banned
begin fun peer_id () () ->
match net.pool with
| None -> return false
| Some pool ->
if P2p_pool.Peers.get_trusted pool peer_id then return false
else return (P2p_pool.Peers.is_banned pool peer_id)
end in
(* Network : Point *)
(* Network : Point *)
let dir =
let dir =
@ -700,4 +746,49 @@ let build_rpc_directory net =
RPC_answer.return_stream { next ; shutdown }
RPC_answer.return_stream { next ; shutdown }
end in
end in
let dir =
RPC_directory.gen_register1 dir P2p_services.Points.S.ban
begin fun point () () ->
match net.pool with
| None -> RPC_answer.not_found
| Some pool ->
P2p_pool.Points.unset_trusted pool point;
P2p_pool.Points.ban pool point;
RPC_answer.return ()
end in
let dir =
RPC_directory.gen_register1 dir P2p_services.Points.S.trust
begin fun point () () ->
match net.pool with
| None -> RPC_answer.not_found
| Some pool ->
P2p_pool.Points.set_trusted pool point ;
RPC_answer.return ()
end in
let dir =
RPC_directory.gen_register1 dir P2p_services.Points.S.is_banned
begin fun point () () ->
match net.pool with
| None -> RPC_answer.return false
| Some pool ->
if P2p_pool.Points.get_trusted pool point then
RPC_answer.return false
RPC_answer.return (P2p_pool.Points.is_banned pool point)
end in
(* Network : Greylist *)
let dir =
RPC_directory.register dir P2p_services.Greylist.S.clear
begin fun () () () ->
match net.pool with
| None -> return ()
| Some pool ->
P2p_pool.greylist_clear pool ;
return ()
end in
@ -73,6 +73,9 @@ type limits = {
authentication_timeout : float ;
authentication_timeout : float ;
(** Delay granted to a peer to perform authentication, in seconds. *)
(** Delay granted to a peer to perform authentication, in seconds. *)
greylist_timeout : float ;
(** GC delay for the grelists tables, in seconds. *)
min_connections : int ;
min_connections : int ;
(** Strict minimum number of connections (triggers an urgent maintenance) *)
(** Strict minimum number of connections (triggers an urgent maintenance) *)
@ -210,6 +213,9 @@ val on_new_connection :
val build_rpc_directory : _ t -> unit RPC_directory.t
val build_rpc_directory : _ t -> unit RPC_directory.t
val temp_ban_addr : ('msg, 'meta) net -> P2p_addr.t -> unit
val temp_ban_peer : ('msg, 'meta) net -> P2p_peer.Id.t -> unit
module Raw : sig
module Raw : sig
Normal file
Normal file
@ -0,0 +1,223 @@
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
module PeerRing = Ring.MakeTable(struct
include P2p_peer.Id
let hash = Hashtbl.hash
module PatriciaTree(V:HashPtree.Value) = struct
module Size = struct
let size = 128
module Bits = HashPtree.Bits(Size)
module M = HashPtree.Make_BE_sized(V)(Size)
type t = M.t
let empty = M.empty
(* take into consideration the fact that the int64
* returned by Ipaddr.V6.to_int64 is signed *)
let z_of_bytes i =
let i = Z.of_int64 i in
Z.(if i < zero then i + of_int 2 ** 64 else i)
let z_of_ipv6 ip =
let hi_x, lo_x = Ipaddr.V6.to_int64 ip in
let hi = z_of_bytes hi_x in
let lo = z_of_bytes lo_x in
Z.((hi lsl 64) + lo)
let key_of_ipv6 ip =
Bits.of_z (z_of_ipv6 ip)
let z_mask_of_ipv6_prefix p =
let ip = Ipaddr.V6.Prefix.network p in
let len = Ipaddr.V6.Prefix.bits p in
z_of_ipv6 ip, Z.(lsl) Z.one (128 - len)
let key_mask_of_ipv6_prefix p =
let z, m = z_mask_of_ipv6_prefix p in
Bits.of_z z, Bits.of_z m
let z_to_ipv6 z =
(* assumes z is a 128 bit value *)
let hi_z = Z.(z asr 64) in
let hi =
if Z.(hi_z >= of_int 2 ** 63) then
(* If overflows int64, then returns the bit equivalent
representation (which is negative) *)
Int64.add 0x8000000000000000L
((Z.(to_int64 (hi_z - (of_int 2 ** 63)))))
Z.(to_int64 hi_z)
let lo = Z.(to_int64 (z mod (pow ~$2 64))) in
Ipaddr.V6.of_int64 (hi, lo)
let remove key t =
M.remove (key_of_ipv6 key) t
let remove_prefix prefix t =
let key, mask = key_mask_of_ipv6_prefix prefix in
M.remove_prefix key mask t
let add_prefix prefix value t =
let key, mask = key_mask_of_ipv6_prefix prefix in
M.add (fun _ v -> v) ~key ~value ~mask t
let add key value t =
let key = key_of_ipv6 key in
M.add (fun _ v -> v) ~key ~value t
let mem key t = M.mem (key_of_ipv6 key) t
let key_mask_to_prefix key mask =
let len =
if Bits.(equal mask zero) then 0
else 128 - (Z.trailing_zeros (Bits.to_z mask))
Ipaddr.V6.Prefix.make len (z_to_ipv6 (Bits.to_z key))
let fold f t acc =
let f key mask value acc =
let prefix = key_mask_to_prefix key mask in
f prefix value acc
M.fold f t acc
let pp ppf t =
let lst = fold (fun p _ l -> p :: l) t [] in
Format.fprintf ppf "@[<2>[%a]@]"
Format.(pp_print_list ~pp_sep:(fun ppf () -> fprintf ppf ";@ ")
(* patricia trees using IpV6 addresses as keys *)
module IpSet = struct
include PatriciaTree(Time)
let gc t ~delay =
let timenow = Time.now() in
let module MI =
type result = Time.t
let default = Time.now()
let map _t _key value = value
let reduce _t left right = Time.(max left right)
let module MR = M.Map_Reduce(MI) in
MR.filter (fun addtime ->
Time.(timenow < (add addtime (Int64.of_float delay)))
) t
module IpTable = Hashtbl.Make(struct
type t = Ipaddr.V6.t
let hash = Hashtbl.hash
let equal x y = Ipaddr.V6.compare x y = 0
type raw = {
mutable greylist_ips : IpSet.t ;
greylist_peers : PeerRing.t ;
banned_ips : unit IpTable.t ;
banned_peers : unit P2p_peer.Table.t ;
type t = raw ref
let create size = ref {
greylist_ips = IpSet.empty;
greylist_peers = PeerRing.create size;
banned_ips = IpTable.create 53;
banned_peers = P2p_peer.Table.create 53;
(* check if an ip is banned. priority is for static blacklist, then
in the greylist *)
let is_banned_addr acl addr =
(IpTable.mem !acl.banned_ips addr) ||
(IpSet.mem addr !acl.greylist_ips)
(* Check is the peer_id is in the banned ring. It might be possible that
a peer ID that is not banned, but its ip address is. *)
let is_banned_peer acl peer_id =
(P2p_peer.Table.mem !acl.banned_peers peer_id) ||
(PeerRing.mem !acl.greylist_peers peer_id)
let greylist_clear acl =
!acl.greylist_ips <- IpSet.empty;
P2p_peer.Table.clear !acl.banned_peers;
IpTable.clear !acl.banned_ips;
PeerRing.clear !acl.greylist_peers
module IPGreylist = struct
(* Add the given ip address to the ip greylist *)
let add acl addr =
!acl.greylist_ips <- IpSet.add addr (Time.now()) !acl.greylist_ips
let mem acl addr = IpSet.mem addr !acl.greylist_ips
(* The GC operation works only on the address set. Peers are removed
from the ring in a round-robin fashion. If a address is removed
by the GC from the acl.greylist set, it could potentially
persist in the acl.peers set until more peers are banned. *)
let gc acl ~delay =
!acl.greylist_ips <- IpSet.gc !acl.greylist_ips ~delay
let encoding = Data_encoding.(list P2p_addr.encoding)
module IPBlacklist = struct
(* Add the given ip address to the ip blacklist *)
let add acl addr =
IpTable.add !acl.banned_ips addr ()
(* Remove the given ip address to the ip blacklist *)
let remove acl addr =
IpTable.remove !acl.banned_ips addr
let mem acl addr =
IpTable.mem !acl.banned_ips addr
module PeerBlacklist = struct
let add acl addr =
P2p_peer.Table.add !acl.banned_peers addr ()
let remove acl addr =
P2p_peer.Table.remove !acl.banned_peers addr
let mem acl addr =
P2p_peer.Table.mem !acl.banned_peers addr
module PeerGreylist = struct
(* Ban the given peer_id. It also add the given ip address to the blacklist. *)
let add acl peer_id =
PeerRing.add !acl.greylist_peers peer_id
let mem acl peer_id =
(PeerRing.mem !acl.greylist_peers peer_id)
Normal file
Normal file
@ -0,0 +1,100 @@
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
This module implements four tables
- ip grey lists used to automatically ban a given ip addr
- peer_id greylist used to automatically ban a given peer_id
- ip black lists used to manually ban a given ip addr
- peers black list used to manually trust a given peer_id
IP greylists use a time based GC to periodically remove entries from
the table, while peer_id grey lists are built using a ring structure,
where peers are removed from the table when removed from the fixed size
ring. Other tables are user defined and static.
type t
(** Create a new ACL of given size *)
val create : int -> t
(** Check if an address is banned either temporally or permanently *)
val is_banned_addr : t -> P2p_addr.t -> bool
(** Check if a peer is banned either temporally or permanently *)
val is_banned_peer : t -> P2p_peer.Id.t -> bool
(** Reinitialize the Greylist tables *)
val greylist_clear : t -> unit
module IPGreylist : sig
(* Add the given ip address to the ip greylist *)
val add: t -> P2p_addr.t -> unit
(** [gc time] removes all banned peers older than the given time in seconds
The GC operation works only on the address set. Peers are removed
from the ring in a round-robin fashion. If a address is removed
by the GC from the greylist set, it could potentially
persist in the peers' blacklist set until more peers are banned. *)
val gc: t -> delay:float -> unit
val encoding: P2p_addr.t list Data_encoding.t
module IPBlacklist : sig
(* Add the given ip address to the ip blacklist *)
val add: t -> P2p_addr.t -> unit
(* Remove the given ip address to the ip blacklist *)
val remove: t -> P2p_addr.t -> unit
module PeerBlacklist : sig
(* Add the given ip address to the ip blacklist *)
val add: t -> P2p_peer.Id.t -> unit
(* Remove the given ip address to the ip blacklist *)
val remove: t -> P2p_peer.Id.t -> unit
module PeerGreylist : sig
(* Ban the given peer_id. It also add the given ip address to the blacklist. *)
val add: t -> P2p_peer.Id.t -> unit
(** / *)
module PeerRing : Ring.TABLE with type v = P2p_peer.Id.t
module IpSet : sig
type t
val empty: t
val add : Ipaddr.V6.t -> Time.t -> t -> t
val add_prefix : Ipaddr.V6.Prefix.t -> Time.t -> t -> t
val remove : Ipaddr.V6.t -> t -> t
val remove_prefix : Ipaddr.V6.Prefix.t -> t -> t
val mem : Ipaddr.V6.t -> t -> bool
val fold: (Ipaddr.V6.Prefix.t -> Time.t -> 'a -> 'a) -> t -> 'a -> 'a
val pp : Format.formatter -> t -> unit
val gc : t -> delay:float -> t
module IpTable : Hashtbl.S with type key = Ipaddr.V6.t
@ -20,6 +20,7 @@ type 'meta pool = Pool : ('msg, 'meta) P2p_pool.t -> 'meta pool
type 'meta t = {
type 'meta t = {
canceler: Lwt_canceler.t ;
canceler: Lwt_canceler.t ;
greylist_timeout: float;
bounds: bounds ;
bounds: bounds ;
pool: 'meta pool ;
pool: 'meta pool ;
just_maintained: unit Lwt_condition.t ;
just_maintained: unit Lwt_condition.t ;
@ -29,8 +30,8 @@ type 'meta t = {
(** Select [expected] points amongst the disconnected known points.
(** Select [expected] points amongst the disconnected known points.
It ignores points which are greylisted, or for which a connection
It ignores points which are greylisted, or for which a connection
failed after [start_time]. It first selects points with the oldest
failed after [start_time] and the pointes that are banned. It
last tentative. *)
first selects points with the oldest last tentative. *)
let connectable st start_time expected =
let connectable st start_time expected =
let Pool pool = st.pool in
let Pool pool = st.pool in
let now = Time.now () in
let now = Time.now () in
@ -52,6 +53,7 @@ let connectable st start_time expected =
match P2p_point_state.Info.last_miss pi with
match P2p_point_state.Info.last_miss pi with
| Some last when Time.(start_time < last)
| Some last when Time.(start_time < last)
|| P2p_point_state.Info.greylisted ~now pi -> ()
|| P2p_point_state.Info.greylisted ~now pi -> ()
| _ when (P2p_pool.Points.is_banned pool point) -> ()
| last ->
| last ->
Bounded_point_info.insert (last, point) acc
Bounded_point_info.insert (last, point) acc
@ -87,10 +89,13 @@ let rec try_to_contact
(min_to_contact - established) (max_to_contact - established)
(min_to_contact - established) (max_to_contact - established)
(** Do a maintenance step. It will terminate only when the number
(** Do a maintenance step. It will terminate only when the number
of connections is between `min_threshold` and `max_threshold`. *)
of connections is between `min_threshold` and `max_threshold`.
Do a pass in the list of banned peers and remove all peers that
have been banned for more then xxx seconds *)
let rec maintain st =
let rec maintain st =
let Pool pool = st.pool in
let Pool pool = st.pool in
let n_connected = P2p_pool.active_connections pool in
let n_connected = P2p_pool.active_connections pool in
P2p_pool.gc_greylist pool ~delay:st.greylist_timeout;
if n_connected < st.bounds.min_threshold then
if n_connected < st.bounds.min_threshold then
too_few_connections st n_connected
too_few_connections st n_connected
else if st.bounds.max_threshold < n_connected then
else if st.bounds.max_threshold < n_connected then
@ -163,10 +168,11 @@ let rec worker_loop st =
| Error [ Canceled ] -> Lwt.return_unit
| Error [ Canceled ] -> Lwt.return_unit
| Error _ -> Lwt.return_unit
| Error _ -> Lwt.return_unit
let run bounds pool =
let run ~greylist_timeout bounds pool =
let canceler = Lwt_canceler.create () in
let canceler = Lwt_canceler.create () in
let st = {
let st = {
canceler ;
canceler ;
greylist_timeout ;
bounds ;
bounds ;
pool = Pool pool ;
pool = Pool pool ;
just_maintained = Lwt_condition.create () ;
just_maintained = Lwt_condition.create () ;
@ -37,9 +37,11 @@ type 'meta t
(** Type of a maintenance worker. *)
(** Type of a maintenance worker. *)
val run:
val run:
greylist_timeout:float ->
bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t
bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t
(** [run bounds pool] is a maintenance worker for [pool] with
(** [run ~greylist_timeout bounds pool] is a maintenance worker for
connection targets specified in [bounds]. *)
[pool] with connection targets specified in [bounds] and greylist
GC frequency [greylist_timeout]. *)
val maintain: 'meta t -> unit Lwt.t
val maintain: 'meta t -> unit Lwt.t
(** [maintain t] gives a hint to maintenance worker [t] that
(** [maintain t] gives a hint to maintenance worker [t] that
@ -176,6 +176,7 @@ type config = {
max_incoming_connections : int ;
max_incoming_connections : int ;
connection_timeout : float ;
connection_timeout : float ;
authentication_timeout : float ;
authentication_timeout : float ;
greylist_timeout : float ;
incoming_app_message_queue_size : int option ;
incoming_app_message_queue_size : int option ;
incoming_message_queue_size : int option ;
incoming_message_queue_size : int option ;
@ -218,6 +219,7 @@ type ('msg, 'meta) t = {
encoding : 'msg Message.t Data_encoding.t ;
encoding : 'msg Message.t Data_encoding.t ;
events : events ;
events : events ;
watcher : P2p_connection.Pool_event.t Lwt_watcher.input ;
watcher : P2p_connection.Pool_event.t Lwt_watcher.input ;
acl : P2p_acl.t;
mutable new_connection_hook :
mutable new_connection_hook :
(P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) list ;
(P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) list ;
mutable latest_accepted_swap : Time.t ;
mutable latest_accepted_swap : Time.t ;
@ -396,12 +398,76 @@ let broadcast_bootstrap_msg pool =
(* this function duplicates bit of code from the modules below to avoid
creating mutually recurvive modules *)
let get_addr pool peer_id =
let info peer_id =
try Some (P2p_peer.Table.find pool.known_peer_ids peer_id)
with Not_found -> None
let find_by_peer_id peer_id =
(info peer_id)
~f:(fun p ->
match P2p_peer_state.get p with
| Running { data } -> Some data
| _ -> None)
match find_by_peer_id peer_id with
|None -> None
|Some ci ->
let info = P2p_socket.info ci.conn in
module Points = struct
type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t
let info { known_points } point =
try Some (P2p_point.Table.find known_points point)
with Not_found -> None
let get_trusted pool point =
try P2p_point_state.Info.trusted (P2p_point.Table.find pool.known_points point)
with Not_found -> false
let set_trusted pool point =
(register_point pool pool.config.identity.peer_id point)
with Not_found -> ()
let unset_trusted pool point =
try P2p_point_state.Info.unset_trusted (P2p_point.Table.find pool.known_points point)
with Not_found -> ()
let fold_known pool ~init ~f =
P2p_point.Table.fold f pool.known_points init
let fold_connected pool ~init ~f =
P2p_point.Table.fold f pool.connected_points init
let is_banned pool point =
P2p_acl.is_banned_addr pool.acl (fst point)
let ban pool point =
P2p_acl.IPBlacklist.add pool.acl (fst point)
let trust pool point =
P2p_acl.IPBlacklist.remove pool.acl (fst point)
let forget pool point =
unset_trusted pool point; (* remove from whitelist *)
P2p_acl.IPBlacklist.remove pool.acl (fst point)
module Peers = struct
module Peers = struct
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t
let info { known_peer_ids } point =
let info { known_peer_ids } peer_id =
try Some (P2p_peer.Table.find known_peer_ids point)
try Some (P2p_peer.Table.find known_peer_ids peer_id)
with Not_found -> None
with Not_found -> None
let get_metadata pool peer_id =
let get_metadata pool peer_id =
@ -432,35 +498,29 @@ module Peers = struct
let fold_connected pool ~init ~f =
let fold_connected pool ~init ~f =
P2p_peer.Table.fold f pool.connected_peer_ids init
P2p_peer.Table.fold f pool.connected_peer_ids init
let forget pool peer =
match get_addr pool peer with
|None -> ()
|Some point ->
unset_trusted pool peer; (* remove from whitelist *)
P2p_acl.PeerBlacklist.remove pool.acl peer;
P2p_acl.IPBlacklist.remove pool.acl (fst point)
module Points = struct
let ban pool peer =
match get_addr pool peer with
|None -> ()
|Some point ->
Points.ban pool point;
P2p_acl.PeerBlacklist.add pool.acl peer
type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t
let trust pool peer =
match get_addr pool peer with
|None -> ()
|Some point ->
Points.trust pool point
let info { known_points } point =
let is_banned pool peer =
try Some (P2p_point.Table.find known_points point)
P2p_acl.is_banned_peer pool.acl peer
with Not_found -> None
let get_trusted pool point =
try P2p_point_state.Info.trusted (P2p_point.Table.find pool.known_points point)
with Not_found -> false
let set_trusted pool point =
(register_point pool pool.config.identity.peer_id point)
with Not_found -> ()
let unset_trusted pool peer_id =
try P2p_point_state.Info.unset_trusted (P2p_point.Table.find pool.known_points peer_id)
with Not_found -> ()
let fold_known pool ~init ~f =
P2p_point.Table.fold f pool.known_points init
let fold_connected pool ~init ~f =
P2p_point.Table.fold f pool.connected_points init
@ -532,10 +592,24 @@ module Connection = struct
let temp_ban_addr pool addr =
P2p_acl.IPGreylist.add pool.acl addr
let temp_ban_peer pool peer =
match get_addr pool peer with
|None -> ()
|Some point ->
temp_ban_addr pool (fst point);
P2p_acl.PeerGreylist.add pool.acl peer
let greylist_clear pool =
P2p_acl.greylist_clear pool.acl
let gc_greylist ~delay pool = P2p_acl.IPGreylist.gc ~delay pool.acl
let pool_stat { io_sched } =
let pool_stat { io_sched } =
P2p_io_scheduler.global_stat io_sched
P2p_io_scheduler.global_stat io_sched
let fail_unless_disconnected_point point_info =
let fail_unless_disconnected_point point_info =
@ -571,6 +645,9 @@ let compare_known_point_info p1 p2 =
| true, true -> compare_last_seen p2 p1
| true, true -> compare_last_seen p2 p1
let rec connect ?timeout pool point =
let rec connect ?timeout pool point =
(not(Points.is_banned pool point))
(P2p_errors.Point_banned point) >>=? fun () ->
let timeout =
let timeout =
Option.unopt ~default:pool.config.connection_timeout timeout in
Option.unopt ~default:pool.config.connection_timeout timeout in
@ -645,6 +722,9 @@ and authenticate pool ?point_info canceler fd point =
lwt_debug "authenticate: %a -> auth %a"
lwt_debug "authenticate: %a -> auth %a"
P2p_point.Id.pp point
P2p_point.Id.pp point
P2p_connection.Info.pp info >>= fun () ->
P2p_connection.Info.pp info >>= fun () ->
(not(Peers.is_banned pool info.peer_id))
(P2p_errors.Peer_banned info.peer_id) >>=? fun () ->
let remote_point_info =
let remote_point_info =
match info.id_point with
match info.id_point with
| addr, Some port
| addr, Some port
@ -901,7 +981,9 @@ and swap pool conn current_peer_id new_point =
let accept pool fd point =
let accept pool fd point =
log pool (Incoming_connection point) ;
log pool (Incoming_connection point) ;
if pool.config.max_incoming_connections <= P2p_point.Table.length pool.incoming
if pool.config.max_incoming_connections <= P2p_point.Table.length pool.incoming
|| pool.config.max_connections <= active_connections pool then
|| pool.config.max_connections <= active_connections pool
(* silently ignore banned points *)
|| (P2p_acl.is_banned_addr pool.acl (fst point)) then
Lwt.async (fun () -> Lwt_utils_unix.safe_close fd)
Lwt.async (fun () -> Lwt_utils_unix.safe_close fd)
let canceler = Lwt_canceler.create () in
let canceler = Lwt_canceler.create () in
@ -947,6 +1029,7 @@ let create config meta_config message_config io_sched =
encoding = Message.encoding message_config.encoding ;
encoding = Message.encoding message_config.encoding ;
events ;
events ;
watcher = Lwt_watcher.create_input () ;
watcher = Lwt_watcher.create_input () ;
acl = P2p_acl.create 1023;
new_connection_hook = [] ;
new_connection_hook = [] ;
latest_accepted_swap = Time.epoch ;
latest_accepted_swap = Time.epoch ;
latest_succesfull_swap = Time.epoch ;
latest_succesfull_swap = Time.epoch ;
@ -81,6 +81,9 @@ type config = {
authentication_timeout : float ;
authentication_timeout : float ;
(** Delay granted to a peer to perform authentication, in seconds. *)
(** Delay granted to a peer to perform authentication, in seconds. *)
greylist_timeout : float ;
(** GC delay for the grelists tables, in seconds. *)
incoming_app_message_queue_size : int option ;
incoming_app_message_queue_size : int option ;
(** Size of the message queue for user messages (messages returned
(** Size of the message queue for user messages (messages returned
by this module's [read] function. *)
by this module's [read] function. *)
@ -174,6 +177,7 @@ module Pool_event : sig
(** {1 Connections management} *)
(** {1 Connections management} *)
type ('msg, 'meta) connection
type ('msg, 'meta) connection
@ -266,6 +270,11 @@ val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit
(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap]
(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap]
for all member connections to [pool] in [Running] state. *)
for all member connections to [pool] in [Running] state. *)
val temp_ban_peer : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val temp_ban_addr : ('msg, 'meta) pool -> P2p_addr.t -> unit
val gc_greylist: delay:float -> ('msg, 'meta) pool -> unit
val greylist_clear : ('msg, 'meta) pool -> unit
(** {1 Functions on [Peer_id]} *)
(** {1 Functions on [Peer_id]} *)
module Peers : sig
module Peers : sig
@ -295,6 +304,11 @@ module Peers : sig
f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
val forget : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val ban : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val trust : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val is_banned : ('msg, 'meta) pool -> P2p_peer.Id.t -> bool
(** {1 Functions on [Points]} *)
(** {1 Functions on [Points]} *)
@ -322,6 +336,11 @@ module Points : sig
f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
val forget : ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val ban : ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val trust : ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val is_banned : ('msg, 'meta) pool -> P2p_point.Id.t -> bool
val watch: ('msg, 'meta) pool -> P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper
val watch: ('msg, 'meta) pool -> P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper
@ -3,7 +3,11 @@
((names (test_p2p_socket
((names (test_p2p_socket
(libraries (tezos-base
(libraries (tezos-base
@ -21,7 +25,11 @@
((name buildtest)
((name buildtest)
(deps (test_p2p_socket.exe
(deps (test_p2p_socket.exe
((name runtest_p2p_socket)
((name runtest_p2p_socket)
@ -39,11 +47,27 @@
--max-download-speed 1048576 ;; 1 << 20 = 1MB
--max-download-speed 1048576 ;; 1 << 20 = 1MB
((name runtest_p2p_ipv6set)
(action (run ${exe:test_p2p_ipv6set.exe} -v))))
((name runtest_p2p_peerset)
(action (run ${exe:test_p2p_peerset.exe} -v))))
((name runtest_p2p_banned_peers)
(action (run ${exe:test_p2p_banned_peers.exe} -v))))
((name runtest)
((name runtest)
(deps ((alias runtest_p2p_socket)
(deps ((alias runtest_p2p_socket)
(alias runtest_p2p_pool)
(alias runtest_p2p_pool)
(alias runtest_p2p_io_scheduler)))))
(alias runtest_p2p_io_scheduler)
(alias runtest_p2p_peerset)
(alias runtest_p2p_ipv6set)
(alias runtest_p2p_banned_peers)
((name runtest_indent)
((name runtest_indent)
Normal file
Normal file
@ -0,0 +1,66 @@
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
open Error_monad
include Logging.Make (struct let name = "test-p2p-banned_peers" end)
let assert_equal_bool ~msg a b =
if a <> b then Alcotest.fail msg
let a = fun (peer,addr) ->
(P2p_peer.Id.hash_string [peer], Ipaddr.V6.of_string_exn addr)
let foo = a ("foo","ffff::3")
let bar = a ("bar","ffff:00::ff")
let baz = a ("baz","a::2")
let peers = [foo;bar;baz]
let test_empty _ =
let empty = P2p_acl.create 10 in
List.iter (fun (_peer,addr) ->
assert_equal_bool ~msg:__LOC__ false (P2p_acl.is_banned_addr empty addr)
) peers ;
Lwt.return ()
let test_ban _ =
let set = P2p_acl.create 10 in
List.iter (fun (_,addr) -> P2p_acl.IPGreylist.add set addr) peers;
List.iter (fun (_,addr) ->
assert_equal_bool ~msg:__LOC__ true (P2p_acl.is_banned_addr set addr)
) peers ;
Lwt.return ()
let test_gc _ =
let set = P2p_acl.create 10 in
List.iter (fun (_,addr) -> P2p_acl.IPGreylist.add set addr) peers;
List.iter (fun (_peer,addr) ->
assert_equal_bool ~msg:__LOC__ true (P2p_acl.is_banned_addr set addr)
) peers ;
Lwt_unix.sleep 3. >>= fun _ ->
(* remove all peers after one second *)
P2p_acl.IPGreylist.gc set ~delay:1. ;
List.iter (fun (_peer,addr) ->
assert_equal_bool ~msg:__LOC__ false (P2p_acl.is_banned_addr set addr)
) peers ;
Lwt.return ()
let () =
let wrap (n, f) =
Alcotest_lwt.test_case n `Quick (fun _ () -> f ()) in
Alcotest.run ~argv:[|""|] "tezos-p2p" [
List.map wrap [
"empty", test_empty ;
"ban", test_ban;
"gc", test_gc;
Normal file
Normal file
@ -0,0 +1,139 @@
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
include Logging.Make (struct let name = "test-p2p-banned_ip" end)
let assert_equal ?(eq = (=)) ?prn ~msg a b =
let msg = match prn with
| None -> msg
| Some prn ->
Format.asprintf "@[<v 2>%s@,n(%a)@,<>@,(%a)@]" msg prn a prn b in
if not (eq a b) then Alcotest.fail msg
let assert_equal_bool = assert_equal
let a = Ipaddr.V6.of_string_exn
let p = Ipaddr.V6.Prefix.of_string_exn
let timenow = Time.now ()
let of_list l =
List.fold_left (fun acc k ->
P2p_acl.IpSet.add_prefix k timenow acc
) P2p_acl.IpSet.empty l
let test_empty _ =
let addrs = List.map a [ "::" ; "ffff::" ; "a::2" ; ] in
List.iter (fun addr ->
assert_equal_bool ~msg:__LOC__ false (P2p_acl.IpSet.mem addr P2p_acl.IpSet.empty)
) addrs
let test_inclusion _ =
let set = P2p_acl.IpSet.add_prefix (p "ffff::/16") timenow P2p_acl.IpSet.empty in
let included = List.map a [ "ffff::3" ; "ffff:ffff::" ; "ffff:00::ff" ; ] in
let not_included = List.map a [ "fffe::3" ; "ffee:ffff::" ; "::" ; ] in
List.iter (fun addr ->
assert_equal_bool ~msg:__LOC__ true (P2p_acl.IpSet.mem addr set)
) included ;
List.iter (fun addr ->
assert_equal_bool ~msg:__LOC__ false (P2p_acl.IpSet.mem addr set)
) not_included;
let set = P2p_acl.IpSet.add_prefix (p "f000::/4") timenow P2p_acl.IpSet.empty in
assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "e000::") set) ;
(* Add one IP *)
let set = P2p_acl.IpSet.add_prefix (p "::/128") timenow P2p_acl.IpSet.empty in
assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "1::") set) ;
let set = P2p_acl.IpSet.add_prefix (p "ffff:eeee::/32") timenow P2p_acl.IpSet.empty in
assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "eeee:ffff::1") set) ;
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "ffff:eeee::1") set) ;
let set = P2p_acl.IpSet.add_prefix (p "::/17") timenow P2p_acl.IpSet.empty in
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "0000:0000::") set) ;
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "0000:7000::") set) ;
assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "0000:8000::1") set) ;
let setlist = [p "e000::/4" ; p "a000::/4" ; p "ffff::/16"] in
let set = of_list setlist in
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "ffff::1") set) ;
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "a111:8000::1") set) ;
let set = of_list [p "e000::/4" ; p "a000::/4" ;
p "1234:5678::1/128"; p "ffff::/16"] in
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "1234:5678::1") set) ;
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem (a "a111:8000::1") set) ;
assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "b111:8000::1") set) ;
assert_equal ~msg:__LOC__ false (P2p_acl.IpSet.mem (a "1234:5678::100") set)
let test_contiguous _ =
let set = of_list [p "::/1" ; p "8000::/1"] in
List.iter (fun addr ->
assert_equal ~msg:__LOC__ true (P2p_acl.IpSet.mem addr set)
) [a "00::" ; a "01::" ; a "ff::" ]
module PSet = Set.Make(Ipaddr.V6.Prefix)
let test_fold _ =
let addr_list = [p "::/1" ; p "8000::/1" ; p "ffff:ffff::/32" ; ] in
let pset = PSet.of_list addr_list in
let ipv6set =
P2p_acl.IpSet.fold (fun prefix _value s ->
PSet.add prefix s
) (of_list addr_list) PSet.empty ;
assert_equal ~eq:PSet.equal ~msg:__LOC__ ipv6set pset
let print_pset ppf pset =
PSet.iter (fun p ->
Format.fprintf ppf "%a " Ipaddr.V6.Prefix.pp_hum p
) pset
let print_list ppf l =
List.iter (fun p ->
Format.fprintf ppf "%a " Ipaddr.V6.Prefix.pp_hum p
) l
let test_to_list _ =
let to_list s = P2p_acl.IpSet.fold (fun k _v acc -> k::acc) s [] in
let list_eq = List.for_all2 (fun x y -> Ipaddr.V6.Prefix.compare x y = 0) in
let assert_equal_set ~msg a b =
let a = List.sort compare a in
let b = List.sort compare (to_list b) in
assert_equal ~prn:print_list ~eq:list_eq ~msg a b in
let set = P2p_acl.IpSet.add_prefix (p "::/0") timenow P2p_acl.IpSet.empty in
assert_equal ~eq:list_eq ~prn:print_list ~msg:__LOC__ [p "::/0"] (to_list set) ;
let set = of_list [p "::/1" ; p "8000::/1"] in
assert_equal ~eq:list_eq ~prn:print_list ~msg:__LOC__ [p "8000::/1"; p "::/1" ] (to_list set) ;
let setlist = [p "1234:5678::/32"] in
let set = of_list setlist in
assert_equal_set ~msg:__LOC__ setlist set ;
let setlist = [p "e000::/4" ; p "a000::/4" ;
p "ffff::/16" ;
p "1234:5678::/32" ;
] in
let set = of_list setlist in
assert_equal_set ~msg:__LOC__ setlist set
let () =
Alcotest.run ~argv:[|""|] "tezos-p2p" [
"p2p.ipv6set", [
"empty", `Quick, test_empty ;
"inclusion", `Quick, test_inclusion ;
"contiguous", `Quick, test_contiguous ;
"test_fold", `Quick, test_fold ;
"to_list", `Quick, test_to_list ;
Normal file
Normal file
@ -0,0 +1,57 @@
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
include Logging.Make (struct let name = "test-p2p-banned_peers" end)
let assert_equal_bool ~msg a b =
if a <> b then Alcotest.fail msg
let a = fun s -> P2p_peer.Id.hash_string [s]
let test_empty _ =
let peers = List.map a [ "foo"; "bar"; "baz" ; ] in
let empty = P2p_acl.PeerRing.create 10 in
List.iter (fun peer ->
assert_equal_bool ~msg:__LOC__ false (P2p_acl.PeerRing.mem empty peer)
) peers
let test_add _ =
let peers = List.map a [ "foo"; "bar"; "baz" ; ] in
let set = P2p_acl.PeerRing.create 10 in
List.iter (fun peer -> P2p_acl.PeerRing.add set peer) peers;
List.iter (fun peer ->
assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set peer)
) peers
let test_remove _ =
let peers = List.map a [ "foo"; "bar"; "baz" ; ] in
let set = P2p_acl.PeerRing.create 10 in
List.iter (fun peer -> P2p_acl.PeerRing.add set peer) peers;
assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set (a "bar"));
P2p_acl.PeerRing.remove set (a "bar");
assert_equal_bool ~msg:__LOC__ false (P2p_acl.PeerRing.mem set (a "bar"))
let test_overflow _ =
let peers = List.map a [ "foo"; "bar"; "baz" ; ] in
let set = P2p_acl.PeerRing.create 3 in
List.iter (fun peer -> P2p_acl.PeerRing.add set peer) peers;
assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set (a "baz"));
P2p_acl.PeerRing.add set (a "zor");
assert_equal_bool ~msg:__LOC__ true (P2p_acl.PeerRing.mem set (a "zor"));
assert_equal_bool ~msg:__LOC__ false (P2p_acl.PeerRing.mem set (a "baz"))
let () =
Alcotest.run ~argv:[|""|] "tezos-p2p" [
"p2p.peerset", [
"empty", `Quick, test_empty ;
"add", `Quick, test_add;
"overflow", `Quick, test_overflow;
"remove", `Quick, test_remove;
@ -71,6 +71,7 @@ let detach_node f points n =
max_incoming_connections = nb_points ;
max_incoming_connections = nb_points ;
connection_timeout = 10. ;
connection_timeout = 10. ;
authentication_timeout = 2. ;
authentication_timeout = 2. ;
greylist_timeout = 2. ;
incoming_app_message_queue_size = None ;
incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ;
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
outgoing_message_queue_size = None ;
@ -484,8 +484,10 @@ module P2p_reader = struct
(State.Block.known_invalid chain_db.chain_state)
(State.Block.known_invalid chain_db.chain_state)
(Block_header.hash head :: hist) >>= fun known_invalid ->
(Block_header.hash head :: hist) >>= fun known_invalid ->
if not known_invalid then
if not known_invalid then
chain_db.callback.notify_branch state.gid locator ;
chain_db.callback.notify_branch state.gid locator
(* TODO Kickban *)
(* Kickban *)
P2p.temp_ban_peer global_db.p2p state.gid;
| Deactivate chain_id ->
| Deactivate chain_id ->
@ -508,8 +510,10 @@ module P2p_reader = struct
let head = Block_header.hash header in
let head = Block_header.hash header in
State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid ->
State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid ->
if not known_invalid then
if not known_invalid then
chain_db.callback.notify_head state.gid header mempool ;
chain_db.callback.notify_head state.gid header mempool
(* TODO Kickban *)
(* Kickban *)
P2p.temp_ban_peer global_db.p2p state.gid ;
| Get_block_headers hashes ->
| Get_block_headers hashes ->
@ -764,6 +768,9 @@ let get_chain { active_chains } chain_id =
try Some (Chain_id.Table.find active_chains chain_id)
try Some (Chain_id.Table.find active_chains chain_id)
with Not_found -> None
with Not_found -> None
let temp_ban { global_db = { p2p } } peer_id =
Lwt.return (P2p.temp_ban_peer p2p peer_id)
let disconnect { global_db = { p2p } } peer_id =
let disconnect { global_db = { p2p } } peer_id =
match P2p.find_connection p2p peer_id with
match P2p.find_connection p2p peer_id with
| None -> Lwt.return_unit
| None -> Lwt.return_unit
@ -54,6 +54,9 @@ val set_callback: chain_db -> callback -> unit
(** Kick a given peer. *)
(** Kick a given peer. *)
val disconnect: chain_db -> P2p_peer.Id.t -> unit Lwt.t
val disconnect: chain_db -> P2p_peer.Id.t -> unit Lwt.t
(** Temporarily ban of a given peer. *)
val temp_ban: chain_db -> P2p_peer.Id.t -> unit Lwt.t
(** Various accessors. *)
(** Various accessors. *)
val chain_state: chain_db -> State.Chain.t
val chain_state: chain_db -> State.Chain.t
val db: chain_db -> db
val db: chain_db -> db
@ -251,7 +251,7 @@ let on_error w r st errs =
((( Validation_errors.Unknown_ancestor
((( Validation_errors.Unknown_ancestor
| Validation_errors.Invalid_locator _
| Validation_errors.Invalid_locator _
| Block_validator_errors.Invalid_block _ ) :: _) as errors ) ->
| Block_validator_errors.Invalid_block _ ) :: _) as errors ) ->
(* TODO ban the peer_id... *)
Distributed_db.temp_ban pv.parameters.chain_db pv.peer_id >>= fun () ->
debug w
debug w
"Terminating the validation worker for peer %a (kickban)."
"Terminating the validation worker for peer %a (kickban)."
P2p_peer.Id.pp_short pv.peer_id ;
P2p_peer.Id.pp_short pv.peer_id ;
@ -267,7 +267,7 @@ let on_error w r st errs =
protocol >>= function
protocol >>= function
| Ok _ -> return ()
| Ok _ -> return ()
| Error _ ->
| Error _ ->
(* TODO penality... *)
(* TODO: punish *)
debug w
debug w
"Terminating the validation worker for peer %a \
"Terminating the validation worker for peer %a \
(missing protocol %a)."
(missing protocol %a)."
@ -145,6 +145,8 @@ type error += Connection_refused
type error += Rejected of P2p_peer.Id.t
type error += Rejected of P2p_peer.Id.t
type error += Too_many_connections
type error += Too_many_connections
type error += Closed_network
type error += Closed_network
type error += Point_banned of P2p_point.Id.t
type error += Peer_banned of P2p_peer.Id.t
let () =
let () =
(* Pending connection *)
(* Pending connection *)
@ -207,4 +209,30 @@ let () =
~pp:(fun ppf () -> Format.fprintf ppf "Network is closed.")
~pp:(fun ppf () -> Format.fprintf ppf "Network is closed.")
(function Closed_network -> Some () | _ -> None)
(function Closed_network -> Some () | _ -> None)
(fun () -> Closed_network)
(fun () -> Closed_network) ;
(* Point Banned *)
~title:"Point Banned"
~description:"The addr you tried to connect is banned."
~pp:(fun ppf point ->
Format.fprintf ppf
"The addr you tried to connect (%a) is banned."
P2p_addr.pp (fst point))
Data_encoding.(obj1 (req "point" P2p_point.Id.encoding))
(function Point_banned point -> Some point | _ -> None)
(fun point -> Point_banned point) ;
(* Peer Banned *)
~title:"Peer Banned"
~description:"The peer identity you tried to connect is banned."
~pp:(fun ppf peer_id ->
Format.fprintf ppf
"The peer identity you tried to connect (%a) is banned."
P2p_peer.Id.pp peer_id)
Data_encoding.(obj1 (req "peer" P2p_peer.Id.encoding))
(function Peer_banned peer_id -> Some peer_id | _ -> None)
(fun peer_id -> Peer_banned peer_id)
@ -15,7 +15,7 @@ module S = struct
~query: RPC_query.empty
~query: RPC_query.empty
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: (Data_encoding.list P2p_version.encoding)
~output: (Data_encoding.list P2p_version.encoding)
RPC_path.(root / "p2p" / "versions")
RPC_path.(root / "network" / "versions")
let stat =
let stat =
@ -23,7 +23,7 @@ module S = struct
~query: RPC_query.empty
~query: RPC_query.empty
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: P2p_stat.encoding
~output: P2p_stat.encoding
RPC_path.(root / "p2p" / "stat")
RPC_path.(root / "network" / "stat")
let events =
let events =
@ -31,7 +31,7 @@ module S = struct
~query: RPC_query.empty
~query: RPC_query.empty
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: P2p_connection.Pool_event.encoding
~output: P2p_connection.Pool_event.encoding
RPC_path.(root / "p2p" / "log")
RPC_path.(root / "network" / "log")
let connect =
let connect =
@ -39,7 +39,7 @@ module S = struct
~query: RPC_query.empty
~query: RPC_query.empty
~input: Data_encoding.(obj1 (dft "timeout" float 5.))
~input: Data_encoding.(obj1 (dft "timeout" float 5.))
~output: Data_encoding.empty
~output: Data_encoding.empty
RPC_path.(root / "p2p" / "connect" /: P2p_point.Id.rpc_arg)
RPC_path.(root / "network" / "connect" /: P2p_point.Id.rpc_arg)
@ -62,7 +62,7 @@ module Connections = struct
~query: RPC_query.empty
~query: RPC_query.empty
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: (Data_encoding.list P2p_connection.Info.encoding)
~output: (Data_encoding.list P2p_connection.Info.encoding)
RPC_path.(root / "p2p" / "connections")
RPC_path.(root / "network" / "connections")
let info =
let info =
@ -70,7 +70,7 @@ module Connections = struct
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: P2p_connection.Info.encoding
~output: P2p_connection.Info.encoding
~description:"Details about the current P2P connection to the given peer."
~description:"Details about the current P2P connection to the given peer."
RPC_path.(root / "p2p" / "connections" /: P2p_peer.Id.rpc_arg)
RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg)
let kick =
let kick =
@ -78,7 +78,7 @@ module Connections = struct
~input: Data_encoding.(obj1 (req "wait" bool))
~input: Data_encoding.(obj1 (req "wait" bool))
~output: Data_encoding.empty
~output: Data_encoding.empty
~description:"Forced close of the current P2P connection to the given peer."
~description:"Forced close of the current P2P connection to the given peer."
RPC_path.(root / "p2p" / "connections" /: P2p_peer.Id.rpc_arg / "kick")
RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg / "kick")
@ -98,7 +98,7 @@ module Points = struct
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: P2p_point.Info.encoding
~output: P2p_point.Info.encoding
~description: "Details about a given `IP:addr`."
~description: "Details about a given `IP:addr`."
RPC_path.(root / "p2p" / "points" /: P2p_point.Id.rpc_arg)
RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg)
let events =
let events =
@ -107,7 +107,7 @@ module Points = struct
~output: (Data_encoding.list
~output: (Data_encoding.list
~description: "Monitor network events related to an `IP:addr`."
~description: "Monitor network events related to an `IP:addr`."
RPC_path.(root / "p2p" / "points" /: P2p_point.Id.rpc_arg / "log")
RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "log")
let list =
let list =
let filter =
let filter =
@ -121,8 +121,42 @@ module Points = struct
~description:"List the pool of known `IP:port` \
~description:"List the pool of known `IP:port` \
used for establishing P2P connections ."
used for establishing P2P connections."
RPC_path.(root / "networks" / "point")
RPC_path.(root / "network" / "points")
let forget =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Remove the given address from the whitelist/blacklist."
RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "forget" )
let ban =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Ban the given address permanently."
RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "ban" )
let trust =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Trust a given address permanently. \
Connections from this address can still be closed \
on authentication if the peer is banned or blocked."
RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "trust" )
let is_banned =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.bool
~description:"Check is a given address is blocked, permanently or temporarily."
RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "isbanned" )
@ -131,6 +165,10 @@ module Points = struct
let events ctxt point =
let events ctxt point =
make_streamed_call S.events ctxt ((), point) () true
make_streamed_call S.events ctxt ((), point) () true
let list ?(filter = []) ctxt = make_call S.list ctxt () () filter
let list ?(filter = []) ctxt = make_call S.list ctxt () () filter
let forget ctxt peer_id = make_call1 S.forget ctxt peer_id () ()
let ban ctxt peer_id = make_call1 S.ban ctxt peer_id () ()
let trust ctxt peer_id = make_call1 S.trust ctxt peer_id () ()
let is_banned ctxt peer_id = make_call1 S.is_banned ctxt peer_id () ()
@ -144,7 +182,7 @@ module Peers = struct
~input: Data_encoding.empty
~input: Data_encoding.empty
~output: P2p_peer.Info.encoding
~output: P2p_peer.Info.encoding
~description:"Details about a given peer."
~description:"Details about a given peer."
RPC_path.(root / "p2p" / "peers" /: P2p_peer.Id.rpc_arg)
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg)
let events =
let events =
@ -153,7 +191,7 @@ module Peers = struct
~output: (Data_encoding.list
~output: (Data_encoding.list
~description:"Monitor network events related to a given peer."
~description:"Monitor network events related to a given peer."
RPC_path.(root / "p2p" / "peers" /: P2p_peer.Id.rpc_arg / "log")
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "log")
let list =
let list =
let filter =
let filter =
@ -167,7 +205,40 @@ module Peers = struct
~description:"List the peers the node ever met."
~description:"List the peers the node ever met."
RPC_path.(root / "p2p" / "peers")
RPC_path.(root / "network" / "peers")
let forget =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Remove the given peer from the whitelist/blacklist."
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "forget" )
let ban =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Ban the given peer permanently."
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "ban" )
let trust =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Trust a given peer permanently: \
the peer cannot be blocked (but its machine's IP still can)."
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "trust" )
let is_banned =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.bool
~description:"Check is a given peer is blocked, permanently or temporarily."
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "isbanned" )
@ -175,5 +246,26 @@ module Peers = struct
let events ctxt point =
let events ctxt point =
make_streamed_call S.events ctxt ((), point) () true
make_streamed_call S.events ctxt ((), point) () true
let list ?(filter = []) ctxt = make_call S.list ctxt () () filter
let list ?(filter = []) ctxt = make_call S.list ctxt () () filter
let forget ctxt point_id = make_call1 S.forget ctxt point_id () ()
let ban ctxt point_id = make_call1 S.ban ctxt point_id () ()
let trust ctxt point_id = make_call1 S.trust ctxt point_id () ()
let is_banned ctxt point_id = make_call1 S.is_banned ctxt point_id () ()
module Greylist = struct
module S = struct
let clear =
~query: RPC_query.empty
~input: Data_encoding.empty
~output: Data_encoding.empty
~description:"Clear all greylists tables."
RPC_path.(root / "network" / "greylist" / "clear" )
let clear ctxt = make_call S.clear ctxt () ()
@ -87,6 +87,14 @@ module Points : sig
P2p_point.Id.t ->
P2p_point.Id.t ->
(P2p_point.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t
(P2p_point.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t
val forget : #simple -> P2p_point.Id.t -> unit tzresult Lwt.t
val ban: #simple -> P2p_point.Id.t -> unit tzresult Lwt.t
val trust: #simple -> P2p_point.Id.t -> unit tzresult Lwt.t
val is_banned: #simple -> P2p_point.Id.t -> bool tzresult Lwt.t
module S : sig
module S : sig
val list :
val list :
@ -104,6 +112,27 @@ module Points : sig
unit * P2p_point.Id.t, unit, bool,
unit * P2p_point.Id.t, unit, bool,
P2p_point.Pool_event.t list) RPC_service.t
P2p_point.Pool_event.t list) RPC_service.t
val forget :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, unit,
unit) RPC_service.t
val ban :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, unit,
unit) RPC_service.t
val trust :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, unit,
unit) RPC_service.t
val is_banned :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, unit,
bool) RPC_service.t
@ -122,6 +151,14 @@ module Peers : sig
#streamed -> P2p_peer.Id.t ->
#streamed -> P2p_peer.Id.t ->
(P2p_peer.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t
(P2p_peer.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t
val forget : #simple -> P2p_peer.Id.t -> unit tzresult Lwt.t
val ban: #simple -> P2p_peer.Id.t -> unit tzresult Lwt.t
val trust: #simple -> P2p_peer.Id.t -> unit tzresult Lwt.t
val is_banned: #simple -> P2p_peer.Id.t -> bool tzresult Lwt.t
module S : sig
module S : sig
val list :
val list :
@ -139,6 +176,41 @@ module Peers : sig
unit * P2p_peer.Id.t, unit, bool,
unit * P2p_peer.Id.t, unit, bool,
P2p_peer.Pool_event.t list) RPC_service.t
P2p_peer.Pool_event.t list) RPC_service.t
val forget :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
unit) RPC_service.t
val ban :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
unit) RPC_service.t
val trust :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
unit) RPC_service.t
val is_banned :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
bool) RPC_service.t
module Greylist : sig
val clear: #simple -> unit -> unit tzresult Lwt.t
module S : sig
val clear :
([ `POST ], unit,
unit, unit, unit,
unit) RPC_service.t
@ -7,69 +7,136 @@
(* *)
(* *)
type 'a raw =
module Ring = struct
| Empty of int
type 'a raw =
| Inited of {
| Empty of int
data : 'a array ;
| Inited of {
mutable pos : int ;
data : 'a array ;
mutable pos : int ;
type 'a t = 'a raw ref
type 'a t = 'a raw ref
let create size = ref (Empty size)
let create size = ref (Empty size)
let add r v =
let add r v =
match !r with
match !r with
| Empty size ->
| Empty size ->
r := Inited { data = Array.make size v ; pos = 0 }
r := Inited { data = Array.make size v ; pos = 0 }
| Inited s ->
s.pos <-
if s.pos = 2 * Array.length s.data - 1 then
Array.length s.data
s.pos + 1 ;
s.data.(s.pos mod Array.length s.data) <- v
let add_and_return_erased r v =
let replaced = match !r with
| Empty _ -> None
| Inited s ->
| Inited s ->
if s.pos >= Array.length s.data - 1 then
s.pos <-
Some (s.data.(s.pos mod Array.length s.data))
if s.pos = 2 * Array.length s.data - 1 then
Array.length s.data
None in
add r v ; replaced
s.pos + 1 ;
s.data.(s.pos mod Array.length s.data) <- v
let clear r =
let add_and_return_erased r v =
match !r with
let replaced = match !r with
| Empty _ -> ()
| Empty _ -> None
| Inited { data ; _ } ->
| Inited s ->
r := Empty (Array.length data)
if s.pos >= Array.length s.data - 1 then
Some (s.data.(s.pos mod Array.length s.data))
None in
add r v ; replaced
let add_list r l = List.iter (add r) l
let clear r =
match !r with
| Empty _ -> ()
| Inited { data ; _ } ->
r := Empty (Array.length data)
let last r =
match !r with
| Empty _ -> None
| Inited { data ; pos } -> Some data.(pos mod Array.length data)
let fold r ~init ~f =
let add_list r l = List.iter (add r) l
match !r with
| Empty _ -> init
| Inited { data ; pos } ->
let size = Array.length data in
let acc = ref init in
for i = 0 to min pos (size - 1) do
acc := f !acc data.((pos - i) mod size)
done ;
let elements t =
let last r =
fold t ~init:[] ~f:(fun acc elt -> elt :: acc)
match !r with
| Empty _ -> None
| Inited { data ; pos } -> Some data.(pos mod Array.length data)
exception Empty
let fold r ~init ~f =
match !r with
| Empty _ -> init
| Inited { data ; pos } ->
let size = Array.length data in
let acc = ref init in
for i = 0 to min pos (size - 1) do
acc := f !acc data.((pos - i) mod size)
done ;
let last_exn r =
let elements t =
match last r with
fold t ~init:[] ~f:(fun acc elt -> elt :: acc)
| None -> raise Empty
| Some d -> d
exception Empty
let last_exn r =
match last r with
| None -> raise Empty
| Some d -> d
include Ring
(** Ring Buffer Table *)
module type TABLE = sig
type t
type v
val create : int -> t
val clear : t -> unit
val add : t -> v -> unit
val mem : t -> v -> bool
val remove : t -> v -> unit
val elements : t -> v list
(* fixed size set of Peers id. If the set exceed the maximal allowed capacity, the
element that was added first is removed when a new one is added *)
module MakeTable (V: Hashtbl.HashedType) = struct
module Table = Hashtbl.Make(V)
type raw = {
size : int ;
mutable capacity : int ;
ring : V.t Ring.t ;
table : unit Table.t ;
type t = raw ref
type v = V.t
let create size = ref {
capacity = 0;
ring = Ring.create size;
table = Table.create size }
let add ({contents = t } as tt) v =
assert (t.capacity <= t.size);
if t.capacity = t.size then begin
try Table.remove t.table (Ring.last_exn t.ring) with Ring.Empty -> assert false
Ring.add t.ring v;
Table.replace t.table v ();
let capacity = if t.capacity = t.size then t.capacity else t.capacity + 1 in
tt := { ring = t.ring ; table = t.table; size = t.size ; capacity = capacity }
let mem t v = Table.mem !t.table v
let remove ({contents = t } as tt) v =
Table.remove t.table v;
let capacity = if t.capacity = 0 then t.capacity else t.capacity - 1 in
tt := { ring = t.ring ; table = t.table; size = t.size ; capacity = capacity }
let clear ({contents = t } as tt) =
tt := { t with
capacity = 0;
ring = Ring.create t.size;
table = Table.create t.size
let elements t = Ring.elements !t.ring
@ -13,6 +13,8 @@
a fixed number of values of a same type. Values are never removed,
a fixed number of values of a same type. Values are never removed,
once the limit is reached, adding a value replaces the oldest one
once the limit is reached, adding a value replaces the oldest one
in the ring buffer. *)
in the ring buffer. *)
exception Empty
type 'a t
type 'a t
(** Allocates a ring buffer for a given number of values. *)
(** Allocates a ring buffer for a given number of values. *)
@ -33,8 +35,6 @@ val clear : 'a t -> unit
(** Retrieves the most recent value, or [None] when empty. *)
(** Retrieves the most recent value, or [None] when empty. *)
val last : 'a t -> 'a option
val last : 'a t -> 'a option
exception Empty
(** Same as {!last}, but raises {!Empty} when empty. *)
(** Same as {!last}, but raises {!Empty} when empty. *)
val last_exn : 'a t -> 'a
val last_exn : 'a t -> 'a
@ -43,3 +43,31 @@ val fold : 'a t -> init:'b -> f:('b -> 'a -> 'b) -> 'b
(** Retrieves the elements as a list, oldest first.. *)
(** Retrieves the elements as a list, oldest first.. *)
val elements : 'a t -> 'a list
val elements : 'a t -> 'a list
(** Ring Buffer Table *)
module type TABLE = sig
type t
type v
(** [create size] inizialize an empty ring *)
val create : int -> t
(** [retest t] remore all bindings from the current ring *)
val clear : t -> unit
(** [add t v] add a value to the ring. If the ring already contains size elements,
the first element is removed and [v] is added. *)
val add : t -> v -> unit
(** [mem t v] check if v is in the ring. O(1) *)
val mem : t -> v -> bool
(** [remove t v] remove one element from the table *)
val remove : t -> v -> unit
(** [elements t] return the list of elements currently in the ring *)
val elements : t -> v list
module MakeTable (V: Hashtbl.HashedType) : TABLE with type v = V.t
Reference in New Issue
Block a user