Shell: use the new P2P backend
This commit is contained in:
@ -336,13 +336,13 @@ NODE_IMPLS := \
calendar \
calendar \
cmdliner \
cohttp.lwt \
cohttp.lwt \
dynlink \
dynlink \
git \
git \
ipv6-multicast \
ipv6-multicast \
irmin.unix \
irmin.unix \
|||||| \
| \
cmdliner \
@ -8,8 +8,7 @@
open Format
open Format
open Lwt
include Logging.Make(struct let name = "attacker" end)
open Tezos_p2p
module Proto = Client_embedded_proto_bootstrap
module Proto = Client_embedded_proto_bootstrap
module Ed25519 = Proto.Local_environment.Environment.Ed25519
module Ed25519 = Proto.Local_environment.Environment.Ed25519
@ -104,141 +103,170 @@ let ballot_forged period prop vote =
operations = [ballot] }) in
operations = [ballot] }) in
forge { net_id = network } op
forge { net_id = network } op
let identity = P2p_types.Identity.generate Crypto_box.default_target
(* connect to the network, run an action and then disconnect *)
(* connect to the network, run an action and then disconnect *)
let try_action addr port action =
let try_action addr port action =
let limits : P2p.limits = {
let socket = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in
max_message_size = 1 lsl 16 ;
let uaddr = Ipaddr_unix.V6.to_inet_addr addr in
peer_answer_timeout = 10. ;
Lwt_unix.connect socket (Lwt_unix.ADDR_INET (uaddr, port)) >>= fun () ->
expected_connections = 1;
let io_sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 14) () in
min_connections = 1 ;
let conn = P2p_io_scheduler.register io_sched socket in
max_connections = 1 ;
blacklist_time = 0. ;
} in
let config : P2p.config = {
incoming_port = None ;
(addr, port)
discovery_port = None ;
identity Tezos_p2p.Raw.supported_versions >>=? fun (_, auth_fd) ->
known_peers = [(addr, port)] ;
P2p_connection.accept auth_fd Tezos_p2p.Raw.encoding >>= function
peers_file = Filename.temp_file "peers_file" ".txt";
| Error _ -> failwith "Connection rejected by peer."
closed_network = true ;
| Ok conn ->
} in
action conn >>=? fun () ->
bootstrap ~config ~limits >>= fun net ->
P2p_connection.close conn >>= fun () ->
let peer =
return ()
match peers net with
| [peer] -> peer
| _ -> Pervasives.failwith "" in
action net peer >>= fun () -> shutdown net
let replicate n x =
let replicate n x =
let rec replicate_acc acc n x =
let rec replicate_acc acc n x =
if n <= 0 then acc else replicate_acc (x :: acc) (n-1) x in
if n <= 0 then acc else replicate_acc (x :: acc) (n-1) x in
replicate_acc [] n x
replicate_acc [] n x
let request_block_times block_hash n net peer =
let send conn (msg : Tezos_p2p.msg) =
let open Block_hash in
P2p_connection.write conn (Tezos_p2p.Raw.Message msg)
let () = printf "requesting %a block %a times\n"
pp_short block_hash pp_print_int n in
let block_hashes = replicate n block_hash in
send net peer (Get_blocks block_hashes)
let request_op_times op_signed n net peer =
let request_block_times block_hash n conn =
let open Block_hash in
"requesting %a block %d times"
pp_short block_hash n >>= fun () ->
let block_hashes = replicate n block_hash in
send conn (Get_blocks block_hashes)
let request_op_times op_signed n conn =
let open Operation_hash in
let open Operation_hash in
let op_hash = hash_bytes [op_signed] in
let op_hash = hash_bytes [op_signed] in
let () = printf "sending %a transaction\n" pp_short op_hash in
lwt_log_notice "sending %a transaction" pp_short op_hash >>= fun () ->
send net peer (Operation op_signed) >>= fun () ->
send conn (Operation op_signed) >>=? fun () ->
let () = printf "requesting %a transaction %a times\n"
pp_short op_hash pp_print_int n in
"requesting %a transaction %d times"
pp_short op_hash n >>= fun () ->
let op_hashes = replicate n op_hash in
let op_hashes = replicate n op_hash in
send net peer (Get_operations op_hashes)
send conn (Get_operations op_hashes)
let send_block_size n net peer =
let send_block_size n conn =
let bytes = MBytes.create n in
let bytes = MBytes.create n in
let open Block_hash in
let open Block_hash in
let () = printf "propagating fake %a byte block %a\n"
pp_print_int n pp_short (hash_bytes [bytes]) in
"propagating fake %d byte block %a" n pp_short (hash_bytes [bytes]) >>= fun () ->
send net peer (Block bytes)
send conn (Block bytes)
let send_protocol_size n net peer =
let send_protocol_size n conn =
let bytes = MBytes.create n in
let bytes = MBytes.create n in
let open Protocol_hash in
let open Protocol_hash in
let () = printf "propagating fake %a byte protocol %a\n"
pp_print_int n pp_short (hash_bytes [bytes]) in
"propagating fake %d byte protocol %a"
send net peer (Protocol bytes)
n pp_short (hash_bytes [bytes]) >>= fun () ->
send conn (Protocol bytes)
let send_operation_size n net peer =
let send_operation_size n conn =
let op_faked = MBytes.create n in
let op_faked = MBytes.create n in
let op_hashed = Operation_hash.hash_bytes [op_faked] in
let op_hashed = Operation_hash.hash_bytes [op_faked] in
let () = printf "propagating fake %a byte operation %a\n"
pp_print_int n Operation_hash.pp_short op_hashed in
"propagating fake %d byte operation %a"
send net peer (Operation op_faked) >>= fun () ->
n Operation_hash.pp_short op_hashed >>= fun () ->
send conn (Operation op_faked) >>=? fun () ->
let block = signed (block_forged [op_hashed]) in
let block = signed (block_forged [op_hashed]) in
let block_hashed = Block_hash.hash_bytes [block] in
let block_hashed = Block_hash.hash_bytes [block] in
let () = printf "propagating block %a with operation\n"
Block_hash.pp_short block_hashed in
"propagating block %a with operation"
send net peer (Block block)
Block_hash.pp_short block_hashed >>= fun () ->
send conn (Block block)
let send_operation_bad_signature () net peer =
let send_operation_bad_signature () conn =
let open Operation_hash in
let open Operation_hash in
let signed_wrong_op = signed_wrong (tx_forged 5L 1L) in
let signed_wrong_op = signed_wrong (tx_forged 5L 1L) in
let hashed_wrong_op = hash_bytes [signed_wrong_op] in
let hashed_wrong_op = hash_bytes [signed_wrong_op] in
let () = printf "propagating operation %a with wrong signature\n"
pp_short hashed_wrong_op in
"propagating operation %a with wrong signature"
send net peer (Operation signed_wrong_op) >>= fun () ->
pp_short hashed_wrong_op >>= fun () ->
send conn (Operation signed_wrong_op) >>=? fun () ->
let block = signed (block_forged [hashed_wrong_op]) in
let block = signed (block_forged [hashed_wrong_op]) in
let block_hashed = Block_hash.hash_bytes [block] in
let block_hashed = Block_hash.hash_bytes [block] in
let () = printf "propagating block %a with operation\n"
Block_hash.pp_short block_hashed in
"propagating block %a with operation"
send net peer (Block block)
Block_hash.pp_short block_hashed >>= fun () ->
send conn (Block block)
let send_block_bad_signature () net peer =
let send_block_bad_signature () conn =
let open Block_hash in
let open Block_hash in
let signed_wrong_block = signed_wrong (block_forged []) in
let signed_wrong_block = signed_wrong (block_forged []) in
let () = printf "propagating block %a with wrong signature\n"
pp_short (hash_bytes [signed_wrong_block]) in
"propagating block %a with wrong signature"
send net peer (Block signed_wrong_block)
pp_short (hash_bytes [signed_wrong_block]) >>= fun () ->
send conn (Block signed_wrong_block)
let double_spend () net peer =
let double_spend () conn =
let spend account =
let spend account =
let op_signed = signed (tx_forged ~dest:account 199999999L 1L) in
let op_signed = signed (tx_forged ~dest:account 199999999L 1L) in
let op_hashed = Operation_hash.hash_bytes [op_signed] in
let op_hashed = Operation_hash.hash_bytes [op_signed] in
let block_signed = signed (block_forged [op_hashed]) in
let block_signed = signed (block_forged [op_hashed]) in
let block_hashed = Block_hash.hash_bytes [block_signed] in
let block_hashed = Block_hash.hash_bytes [block_signed] in
let () = printf "propagating operation %a\n"
Operation_hash.pp_short op_hashed in
"propagating operation %a"
send net peer (Operation op_signed) >>= fun () ->
Operation_hash.pp_short op_hashed >>= fun () ->
let () = printf "propagating block %a\n"
send conn (Operation op_signed) >>=? fun () ->
Block_hash.pp_short block_hashed in
send net peer (Block block_signed) in
"propagating block %a"
spend destination_account <&> spend another_account
Block_hash.pp_short block_hashed >>= fun () ->
send conn (Block block_signed) in
spend destination_account >>=? fun () ->
spend another_account
let long_chain n net peer =
let long_chain n conn =
let () = printf "propogating %a blocks\n"
lwt_log_notice "propogating %d blocks" n >>= fun () ->
pp_print_int n in
let prev_ref = ref genesis_block_hashed in
let prev_ref = ref genesis_block_hashed in
let rec loop k = if k < 1 then return_unit else
let rec loop k =
if k < 1 then
return ()
let block = signed (block_forged ~prev:!prev_ref []) in
let block = signed (block_forged ~prev:!prev_ref []) in
let () = prev_ref := Block_hash.hash_bytes [block] in
prev_ref := Block_hash.hash_bytes [block] ;
send net peer (Block block) >>= fun () -> loop (k-1) in
send conn (Block block) >>=? fun () ->
loop (k-1) in
loop n
loop n
let lots_transactions amount fee n net peer =
let lots_transactions amount fee n conn =
let signed_op = signed (tx_forged amount fee) in
let signed_op = signed (tx_forged amount fee) in
let rec loop k = if k < 1 then return_unit else
let rec loop k =
send net peer (Operation signed_op) >>= fun () -> loop (k-1) in
if k < 1 then
return ()
send conn (Operation signed_op) >>=? fun () ->
loop (k-1) in
let ops = replicate n (Operation_hash.hash_bytes [signed_op]) in
let ops = replicate n (Operation_hash.hash_bytes [signed_op]) in
let signed_block = signed (block_forged ops) in
let signed_block = signed (block_forged ops) in
let () = printf "propogating %a transactions\n"
lwt_log_notice "propogating %d transactions" n >>= fun () ->
pp_print_int n in
loop n >>=? fun () ->
loop n >>= fun () ->
let () = printf "propagating block %a with wrong signature\n"
"propagating block %a with wrong signature"
Block_hash.pp_short (Block_hash.hash_bytes [signed_block]) in
Block_hash.pp_short (Block_hash.hash_bytes [signed_block]) >>= fun () ->
send net peer (Block signed_block)
send conn (Block signed_block)
let main () =
let main () =
let addr = Ipaddr.V4 Ipaddr.V4.localhost in
let addr = Ipaddr.V6.localhost in
let port = 9732 in
let port = 9732 in
let run_action action = try_action addr port action in
let run_action action = try_action addr port action in
let run_cmd_unit lwt = Arg.Unit (fun () -> (lwt ())) in
let run_cmd_unit lwt =
let run_cmd_int_suffix lwt = Arg.String (fun str ->
Arg.Unit begin fun () ->
| begin
lwt () >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "Error: %a" pp_print_error err >>= fun () ->
end in
let run_cmd_int_suffix lwt =
Arg.String begin fun str ->
let last = str.[String.length str - 1] in
let last = str.[String.length str - 1] in
let init = String.sub str 0 (String.length str - 1) in
let init = String.sub str 0 (String.length str - 1) in
let n =
let n =
@ -249,7 +277,14 @@ let main () =
else if last == 'g' || last == 'G'
else if last == 'g' || last == 'G'
then int_of_string init * 1 lsl 30
then int_of_string init * 1 lsl 30
else int_of_string str in
else int_of_string str in
|||||| (lwt n)) in
| begin
lwt n >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "Error: %a" pp_print_error err >>= fun () ->
end in
let cmds =
let cmds =
[( "-1",
[( "-1",
run_cmd_int_suffix (run_action << request_block_times genesis_block_hashed),
run_cmd_int_suffix (run_action << request_block_times genesis_block_hashed),
File diff suppressed because it is too large
Load Diff
@ -8,86 +8,117 @@
(** A peer connection address *)
(** A peer connection address *)
type addr = Ipaddr.t
type addr = Ipaddr.V6.t
(** A peer connection port *)
(** A peer connection port *)
type port = int
type port = int
(** A p2p protocol version *)
(** A p2p protocol version *)
type version = {
module Version = P2p_types.Version
name : string ;
major : int ;
(** A global identifier for a peer, a.k.a. an identity *)
minor : int ;
module Gid = P2p_types.Gid
module Identity = P2p_types.Identity
module Point = P2p_types.Point
module Id_point = P2p_types.Id_point
module Connection_info = P2p_types.Connection_info
module Stat = P2p_types.Stat
(** Network configuration *)
(** Network configuration *)
type config = {
type config = {
listening_port : port option;
(** Tells if incoming connections accepted, precising the TCP port
(** Tells if incoming connections accepted, precising the TCP port
on which the peer can be reached *)
on which the peer can be reached *)
incoming_port : port option ;
(** Tells if peers should be discovered automatically on the local
listening_addr : addr option;
network, precising the UDP port to use *)
(** When incoming connections are accepted, precising on which
discovery_port : port option ;
IP adddress the node listen (default: [[::]]). *)
(** List of hard-coded known peers to bootstrap the network from *)
known_peers : (addr * port) list ;
trusted_points : Point.t list ;
(** The path to the JSON file where the peer cache is loaded / stored *)
(** List of hard-coded known peers to bootstrap the network from. *)
peers_file : string ;
peers_file : string ;
(** If [true], the only accepted connections are from peers whose
(** The path to the JSON file where the metadata associated to
addresses are in [known_peers] *)
gids are loaded / stored. *)
closed_network : bool ;
closed_network : bool ;
(** If [true], the only accepted connections are from peers whose
addresses are in [trusted_peers]. *)
identity : Identity.t ;
(** Cryptographic identity of the peer. *)
proof_of_work_target : ;
(** Expected level of proof of work of peers' identity. *)
(** Network capacities *)
(** Network capacities *)
type limits = {
type limits = {
(** Maximum length in bytes of network messages *)
max_message_size : int ;
authentification_timeout : float ;
(** Delay after which a non responding peer is considered dead *)
(** Delay granted to a peer to perform authentication, in seconds. *)
peer_answer_timeout : float ;
(** Minimum number of connections to reach when staring / maitening *)
expected_connections : int ;
(** Strict minimum number of connections (triggers an urgent maintenance) *)
min_connections : int ;
min_connections : int ;
(** Maximum number of connections (exceeding peers are disconnected) *)
(** Strict minimum number of connections (triggers an urgent maintenance) *)
expected_connections : int ;
(** Targeted number of connections to reach when bootstraping / maitening *)
max_connections : int ;
max_connections : int ;
(** How long peers can be blacklisted for maintenance *)
(** Maximum number of connections (exceeding peers are disconnected) *)
blacklist_time : float ;
backlog : int ;
(** Argument of [Lwt_unix.accept].*)
max_incoming_connections : int ;
(** Maximum not-yet-authentified incoming connections. *)
max_download_speed : int option ;
(** Hard-limit in the number of bytes received per second. *)
max_upload_speed : int option ;
(** Hard-limit in the number of bytes sent per second. *)
read_buffer_size : int ;
(** Size in bytes of the buffer passed to []. *)
read_queue_size : int option ;
write_queue_size : int option ;
incoming_app_message_queue_size : int option ;
incoming_message_queue_size : int option ;
outgoing_message_queue_size : int option ;
(** Various bounds for internal queues. *)
(** A global identifier for a peer, a.k.a. an identity *)
type gid
val pp_gid : Format.formatter -> gid -> unit
type 'msg encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg encoding
module type PARAMS = sig
(** Type of message used by higher layers *)
(** Type of message used by higher layers *)
type msg
module type MESSAGE = sig
type t
val encodings : msg encoding list
val encoding : t P2p_connection_pool.encoding list
(** Type of metadata associated to an identity *)
type metadata
val initial_metadata : metadata
val metadata_encoding : metadata Data_encoding.t
val score : metadata -> float
(** High level protocol(s) talked by the peer. When two peers
(** High level protocol(s) talked by the peer. When two peers
initiate a connection, they exchange their list of supported
initiate a connection, they exchange their list of supported
versions. The chosen one, if any, is the maximum common one (in
versions. The chosen one, if any, is the maximum common one (in
lexicographic order) *)
lexicographic order) *)
val supported_versions : version list
val supported_versions : Version.t list
module Make (P : PARAMS) : sig
(** Type of metadata associated to an identity *)
module type METADATA = sig
type t
val initial : t
val encoding : t Data_encoding.t
val score : t -> float
module Make (Message : MESSAGE) (Metadata : METADATA) : sig
type net
type net
@ -99,7 +130,7 @@ module Make (P : PARAMS) : sig
val bootstrap : config:config -> limits:limits -> net Lwt.t
val bootstrap : config:config -> limits:limits -> net Lwt.t
(** Return one's gid *)
(** Return one's gid *)
val gid : net -> gid
val gid : net -> Gid.t
(** A maintenance operation : try and reach the ideal number of peers *)
(** A maintenance operation : try and reach the ideal number of peers *)
val maintain : net -> unit Lwt.t
val maintain : net -> unit Lwt.t
@ -111,51 +142,47 @@ module Make (P : PARAMS) : sig
val shutdown : net -> unit Lwt.t
val shutdown : net -> unit Lwt.t
(** A connection to a peer *)
(** A connection to a peer *)
type peer
type connection
(** Access the domain of active peers *)
(** Access the domain of active peers *)
val peers : net -> peer list
val connections : net -> connection list
(** Return the active peer with identity [gid] *)
(** Return the active peer with identity [gid] *)
val find_peer : net -> gid -> peer option
val find_connection : net -> Gid.t -> connection option
type peer_info = {
gid : gid ;
addr : addr ;
port : port ;
version : version ;
total_sent : int ;
total_recv : int ;
current_inflow : float ;
current_outflow : float ;
(** Access the info of an active peer, if available *)
(** Access the info of an active peer, if available *)
val peer_info : net -> peer -> peer_info
val connection_info : net -> connection -> Connection_info.t
val connection_stat : net -> connection -> Stat.t
val global_stat : net -> Stat.t
(** Accessors for meta information about a global identifier *)
(** Accessors for meta information about a global identifier *)
val get_metadata : net -> gid -> P.metadata option
val get_metadata : net -> Gid.t -> Metadata.t option
val set_metadata : net -> gid -> P.metadata -> unit
val set_metadata : net -> Gid.t -> Metadata.t -> unit
(** Wait for a message from any peer in the network *)
(** Wait for a message from any peer in the network *)
val recv : net -> (peer * P.msg) Lwt.t
val recv : net -> (connection * Message.t) Lwt.t
(** [send net peer msg] is a thread that returns when [msg] has been
(** [send net peer msg] is a thread that returns when [msg] has been
successfully enqueued in the send queue. *)
successfully enqueued in the send queue. *)
val send : net -> peer -> P.msg -> unit Lwt.t
val send : net -> connection -> Message.t -> unit Lwt.t
(** [try_send net peer msg] is [true] if [msg] has been added to the
(** [try_send net peer msg] is [true] if [msg] has been added to the
send queue for [peer], [false] otherwise *)
send queue for [peer], [false] otherwise *)
val try_send : net -> peer -> P.msg -> bool
val try_send : net -> connection -> Message.t -> bool
(** Send a message to all peers *)
(** Send a message to all peers *)
val broadcast : net -> P.msg -> unit
val broadcast : net -> Message.t -> unit
(** Shutdown the connection to all peers at this address and stop the
communications with this machine for [duration] seconds *)
module Raw : sig
val blacklist : net -> gid -> unit
type 'a t =
| Bootstrap
(** Keep a connection to this pair as often as possible *)
| Advertise of P2p_types.Point.t list
val whitelist : net -> gid -> unit
| Message of 'a
| Disconnect
type message = Message.t t
val encoding: message Data_encoding.t
val supported_versions: P2p_types.Version.t list
@ -1,10 +1,7 @@
module Param = struct
type net_id = Store.net_id
type net_id = Store.net_id
type msg =
type msg =
| Discover_blocks of net_id * Block_hash.t list (* Block locator *)
| Discover_blocks of net_id * Block_hash.t list (* Block locator *)
| Block_inventory of net_id * Block_hash.t list
| Block_inventory of net_id * Block_hash.t list
@ -20,10 +17,14 @@ module Param = struct
| Get_protocols of Protocol_hash.t list
| Get_protocols of Protocol_hash.t list
| Protocol of MBytes.t
| Protocol of MBytes.t
let encodings =
module Message = struct
type t = msg
let encoding =
let open Data_encoding in
let open Data_encoding in
let case ?max_length ~tag encoding unwrap wrap =
let case ?max_length ~tag encoding unwrap wrap =
P2p.Encoding { tag; encoding; wrap; unwrap; max_length } in
P2p_connection_pool.Encoding { tag; encoding; wrap; unwrap; max_length } in
case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding))
case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding))
@ -71,13 +72,8 @@ module Param = struct
(fun proto -> Protocol proto);
(fun proto -> Protocol proto);
type metadata = unit
let initial_metadata = ()
let metadata_encoding = Data_encoding.empty
let score () = 0.
let supported_versions =
let supported_versions =
let open P2p in
let open P2p.Version in
[ { name = "TEZOS" ;
[ { name = "TEZOS" ;
major = 0 ;
major = 0 ;
minor = 0 ;
minor = 0 ;
@ -86,5 +82,15 @@ module Param = struct
include Param
type metadata = unit
include P2p.Make(Param)
module Metadata = struct
type t = metadata
let initial = ()
let encoding = Data_encoding.empty
let score () = 0.
include Message
include (Metadata : module type of Metadata with type t := metadata)
include P2p.Make(Message)(Metadata)
@ -13,41 +13,30 @@ val bootstrap : config:config -> limits:limits -> net Lwt.t
(** A maintenance operation : try and reach the ideal number of peers *)
(** A maintenance operation : try and reach the ideal number of peers *)
val maintain : net -> unit Lwt.t
val maintain : net -> unit Lwt.t
(** Voluntarily drop some peers and replace them by new buddies *)
(** Voluntarily drop some connections and replace them by new buddies *)
val roll : net -> unit Lwt.t
val roll : net -> unit Lwt.t
(** Close all connections properly *)
(** Close all connections properly *)
val shutdown : net -> unit Lwt.t
val shutdown : net -> unit Lwt.t
(** A connection to a peer *)
(** A connection to a peer *)
type peer
type connection
(** Access the domain of active peers *)
(** Access the domain of active connections *)
val peers : net -> peer list
val connections : net -> connection list
(** Return the active peer with identity [gid] *)
(** Return the active connection with identity [gid] *)
val find_peer : net -> gid -> peer option
val find_connection : net -> Gid.t -> connection option
type peer_info = {
(** Access the info of an active connection. *)
gid : gid ;
val connection_info : net -> connection -> Connection_info.t
addr : addr ;
port : port ;
version : version ;
total_sent : int ;
total_recv : int ;
current_inflow : float ;
current_outflow : float ;
(** Access the info of an active peer, if available *)
val peer_info : net -> peer -> peer_info
(** Accessors for meta information about a global identifier *)
(** Accessors for meta information about a global identifier *)
type metadata = unit
type metadata = unit
val get_metadata : net -> gid -> metadata option
val get_metadata : net -> Gid.t -> metadata option
val set_metadata : net -> gid -> metadata -> unit
val set_metadata : net -> Gid.t -> metadata -> unit
type net_id = Store.net_id
type net_id = Store.net_id
@ -68,23 +57,28 @@ type msg =
| Get_protocols of Protocol_hash.t list
| Get_protocols of Protocol_hash.t list
| Protocol of MBytes.t
| Protocol of MBytes.t
(** Wait for a payload from any peer in the network *)
(** Wait for a payload from any connection in the network *)
val recv : net -> (peer * msg) Lwt.t
val recv : net -> (connection * msg) Lwt.t
(** [send net peer msg] is a thread that returns when [msg] has been
(** [send net conn msg] is a thread that returns when [msg] has been
successfully enqueued in the send queue. *)
successfully enqueued in the send queue. *)
val send : net -> peer -> msg -> unit Lwt.t
val send : net -> connection -> msg -> unit Lwt.t
(** [try_send net peer msg] is [true] if [msg] has been added to the
(** [try_send net conn msg] is [true] if [msg] has been added to the
send queue for [peer], [false] otherwise *)
send queue for [peer], [false] otherwise *)
val try_send : net -> peer -> msg -> bool
val try_send : net -> connection -> msg -> bool
(** Send a payload to all peers *)
(** Send a payload to all peers *)
val broadcast : net -> msg -> unit
val broadcast : net -> msg -> unit
(** Shutdown the connection to all peers at this address and stop the
communications with this machine for [duration] seconds *)
module Raw : sig
val blacklist : net -> gid -> unit
type 'a t =
| Bootstrap
(** Keep a connection to this pair as often as possible *)
| Advertise of P2p_types.Point.t list
val whitelist : net -> gid -> unit
| Message of 'a
| Disconnect
type message = msg t
val encoding: message Data_encoding.t
val supported_versions: P2p_types.Version.t list
@ -7,6 +7,8 @@
(* *)
(* *)
module V6 = Ipaddr.V6
open Error_monad
open Error_monad
open Logging.Node.Main
open Logging.Node.Main
@ -54,15 +56,15 @@ type cfg = {
min_connections : int ;
min_connections : int ;
max_connections : int ;
max_connections : int ;
expected_connections : int ;
expected_connections : int ;
net_addr : Ipaddr.t ;
net_addr : V6.t ;
net_port : int ;
net_port : int ;
local_discovery : int option ;
(* local_discovery : (string * int) option ; *)
peers : (Ipaddr.t * int) list ;
peers : (V6.t * int) list ;
peers_cache : string ;
peers_cache : string ;
closed : bool ;
closed : bool ;
(* rpc *)
(* rpc *)
rpc_addr : (Ipaddr.t * int) option ;
rpc_addr : (V6.t * int) option ;
cors_origins : string list ;
cors_origins : string list ;
cors_headers : string list ;
cors_headers : string list ;
rpc_crt : string option ;
rpc_crt : string option ;
@ -88,9 +90,9 @@ let default_cfg_of_base_dir base_dir = {
min_connections = 4 ;
min_connections = 4 ;
max_connections = 400 ;
max_connections = 400 ;
expected_connections = 20 ;
expected_connections = 20 ;
net_addr = Ipaddr.(V6 V6.unspecified) ;
net_addr = V6.unspecified ;
net_port = 9732 ;
net_port = 9732 ;
local_discovery = None ;
(* local_discovery = None ; *)
peers = [] ;
peers = [] ;
closed = false ;
closed = false ;
peers_cache = base_dir // "peers_cache" ;
peers_cache = base_dir // "peers_cache" ;
@ -130,16 +132,21 @@ let sockaddr_of_string str =
let addr, port = String.sub str 0 pos, String.sub str (pos+1) (len - pos - 1) in
let addr, port = String.sub str 0 pos, String.sub str (pos+1) (len - pos - 1) in
match Ipaddr.of_string_exn addr, int_of_string port with
match Ipaddr.of_string_exn addr, int_of_string port with
| exception Failure _ -> `Error "not a sockaddr"
| exception Failure _ -> `Error "not a sockaddr"
| ip, port -> `Ok (ip, port)
| V4 ipv4, port -> `Ok (Ipaddr.v6_of_v4 ipv4, port)
| V6 ipv6, port -> `Ok (ipv6, port)
let sockaddr_of_string_exn str =
let sockaddr_of_string_exn str =
match sockaddr_of_string str with
match sockaddr_of_string str with
| `Ok saddr -> saddr
| `Ok saddr -> saddr
| `Error msg -> invalid_arg msg
| `Error msg -> invalid_arg msg
let pp_sockaddr fmt (ip, port) = Format.fprintf fmt "%a:%d" Ipaddr.pp_hum ip port
let pp_sockaddr fmt (ip, port) = Format.fprintf fmt "%a:%d" V6.pp_hum ip port
let string_of_sockaddr saddr = Format.asprintf "%a" pp_sockaddr saddr
let string_of_sockaddr saddr = Format.asprintf "%a" pp_sockaddr saddr
let mcast_params_of_string s = match Utils.split ':' s with
| [iface; port] -> iface, int_of_string port
| _ -> invalid_arg "mcast_params_of_string"
module Cfg_file = struct
module Cfg_file = struct
open Data_encoding
open Data_encoding
@ -150,12 +157,12 @@ module Cfg_file = struct
(opt "protocol" string)
(opt "protocol" string)
let net =
let net =
(opt "min-connections" uint16)
(opt "min-connections" uint16)
(opt "max-connections" uint16)
(opt "max-connections" uint16)
(opt "expected-connections" uint16)
(opt "expected-connections" uint16)
(opt "addr" string)
(opt "addr" string)
(opt "local-discovery" uint16)
(* (opt "local-discovery" string) *)
(opt "peers" (list string))
(opt "peers" (list string))
(dft "closed" bool false)
(dft "closed" bool false)
(opt "peers-cache" string)
(opt "peers-cache" string)
@ -174,21 +181,29 @@ module Cfg_file = struct
(fun { store ; context ; protocol ;
(fun { store ; context ; protocol ;
min_connections ; max_connections ; expected_connections ;
min_connections ; max_connections ; expected_connections ;
net_addr ; net_port ; local_discovery ; peers ;
net_addr ; net_port ;
(* local_discovery ; *)
peers ;
closed ; peers_cache ; rpc_addr ; cors_origins ; cors_headers ; log_output } ->
closed ; peers_cache ; rpc_addr ; cors_origins ; cors_headers ; log_output } ->
let net_addr = string_of_sockaddr (net_addr, net_port) in
let net_addr = string_of_sockaddr (net_addr, net_port) in
(* let local_discovery = Utils.map_option local_discovery *)
(* ~f:(fun (iface, port) -> iface ^ ":" ^ string_of_int port) *)
(* in *)
let rpc_addr = Utils.map_option string_of_sockaddr rpc_addr in
let rpc_addr = Utils.map_option string_of_sockaddr rpc_addr in
let peers = peers ~f:string_of_sockaddr in
let peers = peers ~f:string_of_sockaddr in
let log_output = string_of_log log_output in
let log_output = string_of_log log_output in
((Some store, Some context, Some protocol),
((Some store, Some context, Some protocol),
(Some min_connections, Some max_connections, Some expected_connections,
(Some min_connections, Some max_connections, Some expected_connections,
Some net_addr, local_discovery, Some peers, closed, Some peers_cache),
Some net_addr,
(* local_discovery, *)
Some peers, closed, Some peers_cache),
(rpc_addr, cors_origins, cors_headers),
(rpc_addr, cors_origins, cors_headers),
Some log_output))
Some log_output))
(fun (
(fun (
(store, context, protocol),
(store, context, protocol),
(min_connections, max_connections, expected_connections, net_addr,
(min_connections, max_connections, expected_connections, net_addr,
local_discovery, peers, closed, peers_cache),
(* local_discovery, *)
peers, closed, peers_cache),
(rpc_addr, cors_origins, cors_headers),
(rpc_addr, cors_origins, cors_headers),
log_output) ->
log_output) ->
let open Utils in
let open Utils in
@ -205,11 +220,14 @@ module Cfg_file = struct
let min_connections = unopt default_cfg.min_connections min_connections in
let min_connections = unopt default_cfg.min_connections min_connections in
let max_connections = unopt default_cfg.max_connections max_connections in
let max_connections = unopt default_cfg.max_connections max_connections in
let expected_connections = unopt default_cfg.expected_connections expected_connections in
let expected_connections = unopt default_cfg.expected_connections expected_connections in
(* let local_discovery = map_option local_discovery ~f:mcast_params_of_string in *)
{ default_cfg with
{ default_cfg with
store ; context ; protocol ;
store ; context ; protocol ;
min_connections ; max_connections ; expected_connections ;
min_connections ; max_connections ; expected_connections ;
net_addr; net_port ; local_discovery; peers; closed; peers_cache;
net_addr ; net_port ;
rpc_addr; cors_origins ; cors_headers ; log_output
(* local_discovery ; *)
peers ; closed ; peers_cache ;
rpc_addr ; cors_origins ; cors_headers ; log_output ;
@ -266,9 +284,9 @@ module Cmdline = struct
let net_addr =
let net_addr =
let doc = "The TCP address and port at which this instance can be reached." in
let doc = "The TCP address and port at which this instance can be reached." in
Arg.(value & opt (some sockaddr_converter) None & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["net-addr"])
Arg.(value & opt (some sockaddr_converter) None & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["net-addr"])
let local_discovery =
(* let local_discovery = *)
let doc = "Automatic discovery of peers on the local network." in
(* let doc = "Automatic discovery of peers on the local network." in *)
Arg.(value & opt (some int) None & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["local-discovery"])
(* Arg.(value & opt (some @@ pair string int) None & info ~docs:"NETWORK" ~doc ~docv:"IFACE:PORT" ["local-discovery"]) *)
let peers =
let peers =
let doc = "A peer to bootstrap the network from. Can be used several times to add several peers." in
let doc = "A peer to bootstrap the network from. Can be used several times to add several peers." in
Arg.(value & opt_all sockaddr_converter [] & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["peer"])
Arg.(value & opt_all sockaddr_converter [] & info ~docs:"NETWORK" ~doc ~docv:"ADDR:PORT" ["peer"])
@ -298,7 +316,9 @@ module Cmdline = struct
let parse base_dir config_file sandbox sandbox_param log_level
let parse base_dir config_file sandbox sandbox_param log_level
min_connections max_connections expected_connections
min_connections max_connections expected_connections
net_saddr local_discovery peers closed rpc_addr tls cors_origins cors_headers reset_cfg update_cfg =
(* local_discovery *)
peers closed rpc_addr tls cors_origins cors_headers reset_cfg update_cfg =
let base_dir = Utils.(unopt (unopt default_cfg.base_dir base_dir) sandbox) in
let base_dir = Utils.(unopt (unopt default_cfg.base_dir base_dir) sandbox) in
let config_file = Utils.(unopt ((unopt base_dir sandbox) // "config")) config_file in
let config_file = Utils.(unopt ((unopt base_dir sandbox) // "config")) config_file in
@ -340,7 +360,7 @@ module Cmdline = struct
expected_connections = Utils.unopt cfg.expected_connections expected_connections ;
expected_connections = Utils.unopt cfg.expected_connections expected_connections ;
net_addr = (match net_saddr with None -> cfg.net_addr | Some (addr, _) -> addr) ;
net_addr = (match net_saddr with None -> cfg.net_addr | Some (addr, _) -> addr) ;
net_port = (match net_saddr with None -> cfg.net_port | Some (_, port) -> port) ;
net_port = (match net_saddr with None -> cfg.net_port | Some (_, port) -> port) ;
local_discovery = Utils.first_some local_discovery cfg.local_discovery ;
(* local_discovery = Utils.first_some local_discovery cfg.local_discovery ; *)
peers = (match peers with [] -> cfg.peers | _ -> peers) ;
peers = (match peers with [] -> cfg.peers | _ -> peers) ;
closed = closed || cfg.closed ;
closed = closed || cfg.closed ;
rpc_addr = Utils.first_some rpc_addr cfg.rpc_addr ;
rpc_addr = Utils.first_some rpc_addr cfg.rpc_addr ;
@ -359,7 +379,9 @@ module Cmdline = struct
ret (const parse $ base_dir $ config_file
ret (const parse $ base_dir $ config_file
$ sandbox $ sandbox_param $ v
$ sandbox $ sandbox_param $ v
$ min_connections $ max_connections $ expected_connections
$ min_connections $ max_connections $ expected_connections
$ net_addr $ local_discovery $ peers $ closed
$ net_addr
(* $ local_discovery *)
$ peers $ closed
$ rpc_addr $ rpc_tls $ cors_origins $ cors_headers
$ rpc_addr $ rpc_tls $ cors_origins $ cors_headers
$ reset_config $ update_config
$ reset_config $ update_config
@ -391,10 +413,11 @@ let init_logger { log_output ; log_level } =
| `Null -> Logging.init Null
| `Null -> Logging.init Null
| `Syslog -> Logging.init Syslog
| `Syslog -> Logging.init Syslog
let init_node { sandbox ; sandbox_param ;
let init_node
{ sandbox ; sandbox_param ;
store ; context ;
store ; context ;
min_connections ; max_connections ; expected_connections ;
min_connections ; max_connections ; expected_connections ;
net_port ; peers ; peers_cache ; local_discovery ; closed } =
net_port ; peers ; peers_cache ; closed } =
let patch_context json ctxt =
let patch_context json ctxt =
let module Proto = (val Updater.get_exn genesis_protocol) in
let module Proto = (val Updater.get_exn genesis_protocol) in
@ -428,20 +451,48 @@ let init_node { sandbox ; sandbox_param ;
match sandbox with
match sandbox with
| Some _ -> None
| Some _ -> None
| None ->
| None ->
(* TODO add parameters... *)
let authentification_timeout = 5.
and backlog = 20
and max_incoming_connections = 20
and max_download_speed = None
and max_upload_speed = None
and read_buffer_size = 1 lsl 14
and read_queue_size = None
and write_queue_size = None
and incoming_app_message_queue_size = None
and incoming_message_queue_size = None
and outgoing_message_queue_size = None in
let limits =
let limits =
{ max_message_size = 10_000 ;
{ authentification_timeout ;
peer_answer_timeout = 5. ;
expected_connections ;
min_connections ;
min_connections ;
expected_connections ;
max_connections ;
max_connections ;
blacklist_time = 30. }
backlog ;
max_incoming_connections ;
max_download_speed ;
max_upload_speed ;
read_buffer_size ;
read_queue_size ;
write_queue_size ;
incoming_app_message_queue_size ;
incoming_message_queue_size ;
outgoing_message_queue_size ;
(* TODO add parameters... *)
let identity = P2p.Identity.generate Crypto_box.default_target
and listening_addr = None
and proof_of_work_target = Crypto_box.default_target in
let config =
let config =
{ incoming_port = Some net_port ;
{ listening_port = Some net_port ;
discovery_port = local_discovery ;
listening_addr ;
known_peers = peers ;
identity ;
trusted_points = peers ;
peers_file = peers_cache ;
peers_file = peers_cache ;
closed_network = closed }
closed_network = closed ;
proof_of_work_target ;
Some (config, limits) in
Some (config, limits) in
@ -458,7 +509,7 @@ let init_rpc { rpc_addr ; rpc_crt; rpc_key ; cors_origins ; cors_headers } node
lwt_log_notice "Starting the RPC server listening on port %d (TLS enabled)." port >>= fun () ->
lwt_log_notice "Starting the RPC server listening on port %d (TLS enabled)." port >>= fun () ->
let dir = Node_rpc.build_rpc_directory node in
let dir = Node_rpc.build_rpc_directory node in
let mode = `TLS (`Crt_file_path crt, `Key_file_path key, `No_password, `Port port) in
let mode = `TLS (`Crt_file_path crt, `Key_file_path key, `No_password, `Port port) in
let host = Ipaddr.to_string addr in
let host = Ipaddr.V6.to_string addr in
let () =
let () =
let old_hook = !Lwt.async_exception_hook in
let old_hook = !Lwt.async_exception_hook in
Lwt.async_exception_hook := function
Lwt.async_exception_hook := function
Reference in New Issue
Block a user