P2p: Add option to disable the mempool

This allows a node to specify that it doesn't want to receive operations
that are not included in a block. To do so, one should use the new
--disable-mempool option.

A disabled mempool node announce its configuration during the ACK exchange.
This configuration should be easily expandable with other options.

Node will keep track of the ack exchange configuration for each nodes it
is connected to and will not propagate its mempool to disabled mempool
nodes.

A disabled mempool node will ignore non-empty mempool received.
In the future, this incident should be punish by a decrement of its score
and / or ban.
This commit is contained in:
michael 2018-05-09 19:19:56 +02:00 committed by Grégoire Henry
parent 5b21610985
commit d5925f088c
10 changed files with 103 additions and 23 deletions

View File

@ -31,6 +31,7 @@ and p2p = {
listen_addr : string option ; listen_addr : string option ;
closed : bool ; closed : bool ;
limits : P2p.limits ; limits : P2p.limits ;
disable_mempool : bool ;
} }
and rpc = { and rpc = {
@ -90,6 +91,7 @@ let default_p2p = {
listen_addr = Some ("[::]:" ^ string_of_int default_p2p_port) ; listen_addr = Some ("[::]:" ^ string_of_int default_p2p_port) ;
closed = false ; closed = false ;
limits = default_p2p_limits ; limits = default_p2p_limits ;
disable_mempool = false ;
} }
let default_rpc = { let default_rpc = {
@ -274,14 +276,14 @@ let p2p =
let open Data_encoding in let open Data_encoding in
conv conv
(fun { expected_pow ; bootstrap_peers ; (fun { expected_pow ; bootstrap_peers ;
listen_addr ; closed ; limits } -> listen_addr ; closed ; limits ; disable_mempool } ->
( expected_pow, bootstrap_peers, ( expected_pow, bootstrap_peers,
listen_addr, closed, limits )) listen_addr, closed, limits, disable_mempool ))
(fun ( expected_pow, bootstrap_peers, (fun ( expected_pow, bootstrap_peers,
listen_addr, closed, limits ) -> listen_addr, closed, limits, disable_mempool ) ->
{ expected_pow ; bootstrap_peers ; { expected_pow ; bootstrap_peers ;
listen_addr ; closed ; limits }) listen_addr ; closed ; limits ; disable_mempool })
(obj5 (obj6
(dft "expected-proof-of-work" (dft "expected-proof-of-work"
~description: "Floating point number between 0 and 256 that represents a \ ~description: "Floating point number between 0 and 256 that represents a \
difficulty, 24 signifies for example that at least 24 leading \ difficulty, 24 signifies for example that at least 24 leading \
@ -303,6 +305,13 @@ let p2p =
(dft "limits" (dft "limits"
~description: "Network limits" ~description: "Network limits"
limit default_p2p_limits) limit default_p2p_limits)
(dft "disable_mempool"
~description: "If set to [true], the node will not participate in \
the propagation of pending operations (mempool). \
Default value is [false]. \
It can be used to decrease the memory and \
computation footprints of the node."
bool false)
) )
let rpc : rpc Data_encoding.t = let rpc : rpc Data_encoding.t =
@ -566,6 +575,7 @@ let update
?listen_addr ?listen_addr
?rpc_listen_addr ?rpc_listen_addr
?(closed = false) ?(closed = false)
?(disable_mempool = false)
?(cors_origins = []) ?(cors_origins = [])
?(cors_headers = []) ?(cors_headers = [])
?rpc_tls ?rpc_tls
@ -616,6 +626,7 @@ let update
Option.first_some listen_addr cfg.p2p.listen_addr ; Option.first_some listen_addr cfg.p2p.listen_addr ;
closed = cfg.p2p.closed || closed ; closed = cfg.p2p.closed || closed ;
limits ; limits ;
disable_mempool = cfg.p2p.disable_mempool || disable_mempool ;
} }
and rpc : rpc = { and rpc : rpc = {
listen_addr = listen_addr =

View File

@ -21,6 +21,7 @@ and p2p = {
listen_addr : string option ; listen_addr : string option ;
closed : bool ; closed : bool ;
limits : P2p.limits ; limits : P2p.limits ;
disable_mempool : bool ;
} }
and rpc = { and rpc = {
@ -69,6 +70,7 @@ val update:
?listen_addr:string -> ?listen_addr:string ->
?rpc_listen_addr:string -> ?rpc_listen_addr:string ->
?closed:bool -> ?closed:bool ->
?disable_mempool:bool ->
?cors_origins:string list -> ?cors_origins:string list ->
?cors_headers:string list -> ?cors_headers:string list ->
?rpc_tls:tls -> ?rpc_tls:tls ->

View File

@ -169,6 +169,7 @@ let init_node ?sandbox (config : Node_config_file.t) =
identity ; identity ;
proof_of_work_target = proof_of_work_target =
Crypto_box.make_target config.p2p.expected_pow ; Crypto_box.make_target config.p2p.expected_pow ;
disable_mempool = config.p2p.disable_mempool ;
} }
in in
return (Some (p2p_config, config.p2p.limits)) return (Some (p2p_config, config.p2p.limits))

View File

@ -28,6 +28,7 @@ type t = {
listen_addr: string option ; listen_addr: string option ;
rpc_listen_addr: string option ; rpc_listen_addr: string option ;
closed: bool ; closed: bool ;
disable_mempool: bool ;
cors_origins: string list ; cors_origins: string list ;
cors_headers: string list ; cors_headers: string list ;
rpc_tls: Node_config_file.tls option ; rpc_tls: Node_config_file.tls option ;
@ -39,8 +40,8 @@ let wrap
data_dir config_file data_dir config_file
connections max_download_speed max_upload_speed binary_chunks_size connections max_download_speed max_upload_speed binary_chunks_size
peer_table_size peer_table_size
listen_addr peers no_bootstrap_peers bootstrap_threshold closed expected_pow listen_addr peers no_bootstrap_peers bootstrap_threshold closed disable_mempool
rpc_listen_addr rpc_tls expected_pow rpc_listen_addr rpc_tls
cors_origins cors_headers log_output = cors_origins cors_headers log_output =
let actual_data_dir = let actual_data_dir =
@ -80,6 +81,7 @@ let wrap
listen_addr ; listen_addr ;
rpc_listen_addr ; rpc_listen_addr ;
closed ; closed ;
disable_mempool ;
cors_origins ; cors_origins ;
cors_headers ; cors_headers ;
rpc_tls ; rpc_tls ;
@ -212,6 +214,15 @@ module Term = struct
"Only accept connections from the configured bootstrap peers." in "Only accept connections from the configured bootstrap peers." in
Arg.(value & flag & info ~docs ~doc ["closed"]) Arg.(value & flag & info ~docs ~doc ["closed"])
let disable_mempool =
let doc =
"If set to [true], the node will not participate in the propagation \
of pending operations (mempool). \
Default value is [false]. \
It can be used to decrease the memory and computation footprints \
of the node." in
Arg.(value & flag & info ~docs ~doc ["disable-mempool"])
(* rpc args *) (* rpc args *)
let docs = Manpage.rpc_section let docs = Manpage.rpc_section
@ -249,8 +260,8 @@ module Term = struct
$ connections $ connections
$ max_download_speed $ max_upload_speed $ binary_chunks_size $ max_download_speed $ max_upload_speed $ binary_chunks_size
$ peer_table_size $ peer_table_size
$ listen_addr $ peers $ no_bootstrap_peers $ bootstrap_threshold $ closed $ expected_pow $ listen_addr $ peers $ no_bootstrap_peers $ bootstrap_threshold $ closed $ disable_mempool
$ rpc_listen_addr $ rpc_tls $ expected_pow $ rpc_listen_addr $ rpc_tls
$ cors_origins $ cors_headers $ cors_origins $ cors_headers
$ log_output $ log_output
@ -270,6 +281,7 @@ let read_and_patch_config_file ?(ignore_bootstrap_peers=false) args =
expected_pow ; expected_pow ;
peers ; no_bootstrap_peers ; peers ; no_bootstrap_peers ;
listen_addr ; closed ; listen_addr ; closed ;
disable_mempool ;
rpc_listen_addr ; rpc_tls ; rpc_listen_addr ; rpc_tls ;
cors_origins ; cors_headers ; cors_origins ; cors_headers ;
log_output ; log_output ;
@ -287,5 +299,5 @@ let read_and_patch_config_file ?(ignore_bootstrap_peers=false) args =
?max_download_speed ?max_upload_speed ?binary_chunks_size ?max_download_speed ?max_upload_speed ?binary_chunks_size
?peer_table_size ?expected_pow ?peer_table_size ?expected_pow
~bootstrap_peers ?listen_addr ?rpc_listen_addr ~bootstrap_peers ?listen_addr ?rpc_listen_addr
~closed ~cors_origins ~cors_headers ?rpc_tls ?log_output ~closed ~disable_mempool ~cors_origins ~cors_headers ?rpc_tls ?log_output
?bootstrap_threshold cfg ?bootstrap_threshold cfg

View File

@ -23,6 +23,7 @@ type t = {
listen_addr: string option ; listen_addr: string option ;
rpc_listen_addr: string option ; rpc_listen_addr: string option ;
closed: bool ; closed: bool ;
disable_mempool: bool ;
cors_origins: string list ; cors_origins: string list ;
cors_headers: string list ; cors_headers: string list ;
rpc_tls: Node_config_file.tls option ; rpc_tls: Node_config_file.tls option ;

View File

@ -42,6 +42,7 @@ type config = {
closed_network : bool ; closed_network : bool ;
identity : P2p_identity.t ; identity : P2p_identity.t ;
proof_of_work_target : Crypto_box.target ; proof_of_work_target : Crypto_box.target ;
disable_mempool : bool ;
} }
type limits = { type limits = {

View File

@ -67,6 +67,8 @@ type config = {
proof_of_work_target : Crypto_box.target ; proof_of_work_target : Crypto_box.target ;
(** Expected level of proof of work of peers' identity. *) (** Expected level of proof of work of peers' identity. *)
disable_mempool : bool ;
(** If [true], all non-empty mempools will be ignored. *)
} }
(** Network capacities *) (** Network capacities *)

View File

@ -305,6 +305,7 @@ type db = {
protocol_db: Raw_protocol.t ; protocol_db: Raw_protocol.t ;
block_input: (Block_hash.t * Block_header.t) Lwt_watcher.input ; block_input: (Block_hash.t * Block_header.t) Lwt_watcher.input ;
operation_input: (Operation_hash.t * Operation.t) Lwt_watcher.input ; operation_input: (Operation_hash.t * Operation.t) Lwt_watcher.input ;
connection_metadata_value: (P2p_peer.Id.t -> Connection_metadata.t)
} }
and chain_db = { and chain_db = {
@ -497,7 +498,15 @@ module P2p_reader = struct
| Get_current_head chain_id -> | Get_current_head chain_id ->
may_handle state chain_id @@ fun chain_db -> may_handle state chain_id @@ fun chain_db ->
State.Current_mempool.get chain_db.chain_state >>= fun (head, mempool) -> let { Connection_metadata.disable_mempool } =
P2p.connection_metadata chain_db.global_db.p2p state.conn in
begin
if disable_mempool then
Chain.head chain_db.chain_state >>= fun head ->
Lwt.return (State.Block.header head, Mempool.empty)
else
State.Current_mempool.get chain_db.chain_state
end >>= fun (head, mempool) ->
(* TODO bound the sent mempool size *) (* TODO bound the sent mempool size *)
ignore ignore
@@ P2p.try_send global_db.p2p state.conn @@ P2p.try_send global_db.p2p state.conn
@ -508,6 +517,15 @@ module P2p_reader = struct
may_handle state chain_id @@ fun chain_db -> may_handle state chain_id @@ fun chain_db ->
let head = Block_header.hash header in let head = Block_header.hash header in
State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid -> State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid ->
let { Connection_metadata.disable_mempool } =
chain_db.global_db.connection_metadata_value state.gid in
let known_invalid =
known_invalid ||
(disable_mempool && mempool <> Mempool.empty)
(* A non-empty mempool was received while mempool is desactivated,
so the message is ignored.
This should probably warrant a reduction of the sender's score. *)
in
if not known_invalid then if not known_invalid then
chain_db.callback.notify_head state.gid header mempool chain_db.callback.notify_head state.gid header mempool
else else
@ -690,7 +708,7 @@ let raw_try_send p2p peer_id msg =
| Some conn -> ignore (P2p.try_send p2p conn msg : bool) | Some conn -> ignore (P2p.try_send p2p conn msg : bool)
let create disk p2p = let create disk p2p connection_metadata_value =
let global_request = let global_request =
{ data = () ; { data = () ;
active = active_peer_ids p2p ; active = active_peer_ids p2p ;
@ -704,7 +722,8 @@ let create disk p2p =
let db = let db =
{ p2p ; p2p_readers ; disk ; { p2p ; p2p_readers ; disk ;
active_chains ; protocol_db ; active_chains ; protocol_db ;
block_input ; operation_input } in block_input ; operation_input ;
connection_metadata_value } in
P2p.on_new_connection p2p (P2p_reader.run db) ; P2p.on_new_connection p2p (P2p_reader.run db) ;
P2p.iter_connections p2p (P2p_reader.run db) ; P2p.iter_connections p2p (P2p_reader.run db) ;
db db
@ -935,8 +954,26 @@ module Advertise = struct
let current_head chain_db ?peer ?(mempool = Mempool.empty) head = let current_head chain_db ?peer ?(mempool = Mempool.empty) head =
let chain_id = State.Chain.id chain_db.chain_state in let chain_id = State.Chain.id chain_db.chain_state in
assert (Chain_id.equal chain_id (State.Block.chain_id head)) ; assert (Chain_id.equal chain_id (State.Block.chain_id head)) ;
send chain_db ?peer @@ let msg_mempool =
Current_head (chain_id, State.Block.header head, mempool) Message.Current_head (chain_id, State.Block.header head, mempool) in
if mempool = Mempool.empty then
send chain_db ?peer msg_mempool
else
let msg_disable_mempool =
Message.Current_head (chain_id, State.Block.header head, Mempool.empty) in
let send_mempool state =
let { Connection_metadata.disable_mempool } =
P2p.connection_metadata chain_db.global_db.p2p state.conn in
let msg = if disable_mempool then msg_disable_mempool else msg_mempool in
ignore @@ P2p.try_send chain_db.global_db.p2p state.conn msg
in
match peer with
| Some receiver_id ->
let state = P2p_peer.Table.find chain_db.active_connections receiver_id in
send_mempool state
| None ->
List.iter (fun (_receiver_id, state) -> send_mempool state)
(P2p_peer.Table.fold (fun k v acc -> (k,v)::acc) chain_db.active_connections [])
let current_branch ?peer chain_db = let current_branch ?peer chain_db =
let chain_id = State.Chain.id chain_db.chain_state in let chain_id = State.Chain.id chain_db.chain_state in

View File

@ -18,7 +18,7 @@ module Message = Distributed_db_message
type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
val create: State.t -> p2p -> t val create: State.t -> p2p -> (P2p_peer.Id.t -> Connection_metadata.t) -> t
val state: db -> State.t val state: db -> State.t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t

View File

@ -71,25 +71,37 @@ let peer_metadata_cfg : _ P2p.peer_meta_config = {
score = fun _ -> 0. ; score = fun _ -> 0. ;
} }
let connection_metadata_cfg : _ P2p.conn_meta_config = { let connection_metadata_cfg cfg : _ P2p.conn_meta_config = {
conn_meta_encoding = Connection_metadata.encoding ; conn_meta_encoding = Connection_metadata.encoding ;
conn_meta_value = fun _ -> { disable_mempool = false ; private_node = false} ; conn_meta_value = fun _ -> cfg;
} }
let init_p2p p2p_params = let init_p2p p2p_params =
match p2p_params with match p2p_params with
| None -> | None ->
let conn_metadata_cfg =
connection_metadata_cfg {
Connection_metadata.
disable_mempool = false ;
private_node = false ;
} in
lwt_log_notice "P2P layer is disabled" >>= fun () -> lwt_log_notice "P2P layer is disabled" >>= fun () ->
return (P2p.faked_network peer_metadata_cfg) return (P2p.faked_network peer_metadata_cfg, conn_metadata_cfg)
| Some (config, limits) -> | Some (config, limits) ->
let conn_metadata_cfg =
connection_metadata_cfg {
Connection_metadata.
disable_mempool = config.P2p.disable_mempool ;
private_node = false ;
} in
lwt_log_notice "bootstraping chain..." >>= fun () -> lwt_log_notice "bootstraping chain..." >>= fun () ->
P2p.create P2p.create
~config ~limits ~config ~limits
peer_metadata_cfg peer_metadata_cfg
connection_metadata_cfg conn_metadata_cfg
Distributed_db_message.cfg >>=? fun p2p -> Distributed_db_message.cfg >>=? fun p2p ->
Lwt.async (fun () -> P2p.maintain p2p) ; Lwt.async (fun () -> P2p.maintain p2p) ;
return p2p return (p2p, conn_metadata_cfg)
type config = { type config = {
genesis: State.Chain.genesis ; genesis: State.Chain.genesis ;
@ -137,10 +149,11 @@ let create { genesis ; store_root ; context_root ;
block_validator_limits block_validator_limits
prevalidator_limits prevalidator_limits
chain_validator_limits = chain_validator_limits =
init_p2p p2p_params >>=? fun p2p -> init_p2p p2p_params >>=? fun (p2p, conn_metadata_cfg) ->
State.read State.read
~store_root ~context_root ?patch_context genesis >>=? fun (state, mainchain_state) -> ~store_root ~context_root ?patch_context genesis >>=? fun (state, mainchain_state) ->
let distributed_db = Distributed_db.create state p2p in let distributed_db =
Distributed_db.create state p2p conn_metadata_cfg.conn_meta_value in
Validator.create state distributed_db Validator.create state distributed_db
peer_validator_limits peer_validator_limits
block_validator_limits block_validator_limits