From 826f2ea4ba16fc1f816bbc79eb299300d5fc17d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 14 Mar 2017 10:51:44 +0100 Subject: [PATCH] P2p: implements peer swapping --- scripts/launch-node.sh | 2 +- src/node/main/node_config_file.ml | 14 +- src/node/net/p2p.ml | 64 +- src/node/net/p2p.mli | 13 +- src/node/net/p2p_connection.ml | 10 +- src/node/net/p2p_connection.mli | 2 + src/node/net/p2p_connection_pool.ml | 972 +++++++++++++-------- src/node/net/p2p_connection_pool.mli | 166 ++-- src/node/net/p2p_connection_pool_types.mli | 1 + src/node/net/p2p_maintenance.ml | 49 +- src/node/net/p2p_maintenance.mli | 2 +- src/node/shell/node.ml | 2 +- test/test_p2p_connection_pool.ml | 3 +- 13 files changed, 789 insertions(+), 511 deletions(-) diff --git a/scripts/launch-node.sh b/scripts/launch-node.sh index c7a967e86..fb3b7656f 100755 --- a/scripts/launch-node.sh +++ b/scripts/launch-node.sh @@ -27,7 +27,7 @@ node="$src_dir/tezos-node" cleanup () { set +e echo Cleaning up... -# rm -rf "$data_dir" + rm -rf "$data_dir" } trap cleanup EXIT INT diff --git a/src/node/main/node_config_file.ml b/src/node/main/node_config_file.ml index f6bfa39c4..b1b59a1e3 100644 --- a/src/node/main/node_config_file.ml +++ b/src/node/main/node_config_file.ml @@ -72,6 +72,7 @@ let default_net_limits : P2p.limits = { known_peer_ids_history_size = 500 ; max_known_points = Some (400, 300) ; max_known_peer_ids = Some (400, 300) ; + swap_linger = 30. ; } let default_net = { @@ -115,10 +116,11 @@ let limit : P2p.limits Data_encoding.t = incoming_message_queue_size ; outgoing_message_queue_size ; known_points_history_size ; known_peer_ids_history_size ; max_known_points ; max_known_peer_ids ; + swap_linger ; } -> ( ( authentification_timeout, min_connections, expected_connections, max_connections, backlog, max_incoming_connections, - max_download_speed, max_upload_speed) , + max_download_speed, max_upload_speed, swap_linger) , ( read_buffer_size, read_queue_size, write_queue_size, incoming_app_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size, @@ -127,7 +129,7 @@ let limit : P2p.limits Data_encoding.t = ))) (fun ( ( authentification_timeout, min_connections, expected_connections, max_connections, backlog, max_incoming_connections, - max_download_speed, max_upload_speed) , + max_download_speed, max_upload_speed, swap_linger) , ( read_buffer_size, read_queue_size, write_queue_size, incoming_app_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size, @@ -141,10 +143,9 @@ let limit : P2p.limits Data_encoding.t = incoming_app_message_queue_size ; incoming_message_queue_size ; outgoing_message_queue_size ; known_points_history_size ; known_peer_ids_history_size ; - max_known_points ; max_known_peer_ids - }) + max_known_points ; max_known_peer_ids ; swap_linger }) (merge_objs - (obj8 + (obj9 (dft "authentification-timeout" float default_net_limits.authentification_timeout) (dft "min-connections" uint16 @@ -158,7 +159,8 @@ let limit : P2p.limits Data_encoding.t = (dft "max-incoming-connections" uint8 default_net_limits.max_incoming_connections) (opt "max-download-speed" int31) - (opt "max-upload-speed" int31)) + (opt "max-upload-speed" int31) + (dft "swap-linger" float default_net_limits.swap_linger)) (obj10 (dft "read-buffer-size" int31 default_net_limits.read_buffer_size) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index ecc95dd30..10348749a 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -66,6 +66,9 @@ type limits = { known_points_history_size : int ; max_known_peer_ids : (int * int) option ; max_known_points : (int * int) option ; + + swap_linger : float ; + } let create_scheduler limits = @@ -100,6 +103,7 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched = known_points_history_size = limits.known_points_history_size ; max_known_points = limits.max_known_points ; max_known_peer_ids = limits.max_known_peer_ids ; + swap_linger = limits.swap_linger ; } in let pool = @@ -130,7 +134,8 @@ let create_maintenance_worker limits pool disco = limits.max_connections in P2p_maintenance.run - ~connection_timeout:limits.authentification_timeout bounds pool disco + ~connection_timeout:limits.authentification_timeout + bounds pool disco let may_create_welcome_worker config limits pool = match config.listening_port with @@ -190,14 +195,14 @@ module Real = struct P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched let connections { pool } () = - P2p_connection_pool.fold_connections pool + P2p_connection_pool.Connection.fold pool ~init:[] ~f:(fun _peer_id c acc -> c :: acc) let find_connection { pool } peer_id = - P2p_connection_pool.Peer_ids.find_connection pool peer_id + P2p_connection_pool.Connection.find_by_peer_id pool peer_id let connection_info _net conn = - P2p_connection_pool.connection_info conn + P2p_connection_pool.Connection.info conn let connection_stat _net conn = - P2p_connection_pool.connection_stat conn + P2p_connection_pool.Connection.stat conn let global_stat { pool } () = P2p_connection_pool.pool_stat pool let set_metadata { pool } conn meta = @@ -209,12 +214,12 @@ module Real = struct P2p_connection_pool.read conn >>=? fun msg -> lwt_debug "message read from %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) >>= fun () -> + (P2p_connection_pool.Connection.info conn) >>= fun () -> return msg let rec recv_any net () = let pipes = - P2p_connection_pool.fold_connections + P2p_connection_pool.Connection.fold net.pool ~init:[] ~f:begin fun _peer_id conn acc -> (P2p_connection_pool.is_readable conn >>= function @@ -222,7 +227,7 @@ module Real = struct | Error _ -> Lwt_utils.never_ending) :: acc end in Lwt.pick ( - ( P2p_connection_pool.PoolEvent.wait_new_connection net.pool >>= fun () -> + ( P2p_connection_pool.Pool_event.wait_new_connection net.pool >>= fun () -> Lwt.return_none ):: pipes) >>= function | None -> recv_any net () @@ -231,12 +236,12 @@ module Real = struct | Ok msg -> lwt_debug "message read from %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) >>= fun () -> + (P2p_connection_pool.Connection.info conn) >>= fun () -> Lwt.return (conn, msg) | Error _ -> lwt_debug "error reading message from %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) >>= fun () -> + (P2p_connection_pool.Connection.info conn) >>= fun () -> Lwt_unix.yield () >>= fun () -> recv_any net () @@ -245,12 +250,12 @@ module Real = struct | Ok () -> lwt_debug "message sent to %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) >>= fun () -> + (P2p_connection_pool.Connection.info conn) >>= fun () -> return () | Error err -> lwt_debug "error sending message from %a: %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) + (P2p_connection_pool.Connection.info conn) pp_print_error err >>= fun () -> Lwt.return (Error err) @@ -259,12 +264,12 @@ module Real = struct | Ok v -> debug "message trysent to %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) ; + (P2p_connection_pool.Connection.info conn) ; v | Error err -> debug "error trysending message to %a@ %a" Connection_info.pp - (P2p_connection_pool.connection_info conn) + (P2p_connection_pool.Connection.info conn) pp_print_error err ; false @@ -273,10 +278,10 @@ module Real = struct debug "message broadcasted" let fold_connections { pool } ~init ~f = - P2p_connection_pool.fold_connections pool ~init ~f + P2p_connection_pool.Connection.fold pool ~init ~f let iter_connections { pool } f = - P2p_connection_pool.fold_connections pool + P2p_connection_pool.Connection.fold pool ~init:() ~f:(fun gid conn () -> f gid conn) @@ -315,7 +320,7 @@ type ('msg, 'meta) t = { connection_info : ('msg, 'meta) connection -> Connection_info.t ; connection_stat : ('msg, 'meta) connection -> Stat.t ; global_stat : unit -> Stat.t ; - get_metadata : Peer_id.t -> 'meta option ; + get_metadata : Peer_id.t -> 'meta ; set_metadata : Peer_id.t -> 'meta -> unit ; recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ; recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ; @@ -355,7 +360,7 @@ let create ~config ~limits meta_cfg msg_cfg = on_new_connection = Real.on_new_connection net ; } -let faked_network = { +let faked_network meta_config = { peer_id = Fake.id.peer_id ; maintain = Lwt.return ; roll = Lwt.return ; @@ -365,7 +370,7 @@ let faked_network = { connection_info = (fun _ -> Fake.connection_info) ; connection_stat = (fun _ -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ; - get_metadata = (fun _ -> None) ; + get_metadata = (fun _ -> meta_config.initial) ; set_metadata = (fun _ _ -> ()) ; recv = (fun _ -> Lwt_utils.never_ending) ; recv_any = (fun () -> Lwt_utils.never_ending) ; @@ -402,6 +407,8 @@ module Raw = struct type 'a t = 'a P2p_connection_pool.Message.t = | Bootstrap | Advertise of P2p_types.Point.t list + | Swap_request of Point.t * Peer_id.t + | Swap_ack of Point.t * Peer_id.t | Message of 'a | Disconnect let encoding = P2p_connection_pool.Message.encoding @@ -414,7 +421,7 @@ module RPC = struct | None -> Stat.empty | Some pool -> P2p_connection_pool.pool_stat pool - module Event = P2p_connection_pool.LogEvent + module Event = P2p_connection_pool.Log_event let watch net = match net.pool with @@ -433,14 +440,14 @@ module RPC = struct | None -> None | Some pool -> map_option - (P2p_connection_pool.Peer_ids.find_connection pool peer_id) - ~f:P2p_connection_pool.connection_info + (P2p_connection_pool.Connection.find_by_peer_id pool peer_id) + ~f:P2p_connection_pool.Connection.info let kick net peer_id wait = match net.pool with | None -> Lwt.return_unit | Some pool -> - match P2p_connection_pool.Peer_ids.find_connection pool peer_id with + match P2p_connection_pool.Connection.find_by_peer_id pool peer_id with | None -> Lwt.return_unit | Some conn -> P2p_connection_pool.disconnect ~wait conn @@ -448,10 +455,10 @@ module RPC = struct match net.pool with | None -> [] | Some pool -> - P2p_connection_pool.fold_connections + P2p_connection_pool.Connection.fold pool ~init:[] ~f:begin fun _peer_id c acc -> - P2p_connection_pool.connection_info c :: acc + P2p_connection_pool.Connection.info c :: acc end let count net = @@ -703,12 +710,11 @@ module RPC = struct | Disconnected -> Disconnected, None in let peer_id = Peer_info.peer_id i in - let meta = Peer_info.metadata i in - let score = P2p_connection_pool.score pool meta in + let score = Peer_ids.get_score pool peer_id in let stat = - match P2p_connection_pool.Peer_ids.find_connection pool peer_id with + match P2p_connection_pool.Connection.find_by_peer_id pool peer_id with | None -> Stat.empty - | Some conn -> P2p_connection_pool.connection_stat conn + | Some conn -> P2p_connection_pool.Connection.stat conn in Peer_info.{ score ; trusted = trusted i ; diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index dc6da4224..a8547bf77 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -122,6 +122,11 @@ type limits = { max_known_peer_ids : (int * int) option ; max_known_points : (int * int) option ; (** Optional limitation of internal hashtables (max, target) *) + + swap_linger : float ; + (** Peer swapping does not occur more than once during a timespan of + [swap_linger] seconds. *) + } type ('msg, 'meta) t @@ -129,7 +134,7 @@ type ('msg, 'meta) net = ('msg, 'meta) t (** A faked p2p layer, which do not initiate any connection nor open any listening socket *) -val faked_network : ('msg, 'meta) net +val faked_network : 'meta meta_config -> ('msg, 'meta) net (** Main network initialisation function *) val create : @@ -165,7 +170,7 @@ val connection_stat : val global_stat : ('msg, 'meta) net -> Stat.t (** Accessors for meta information about a global identifier *) -val get_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta option +val get_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta val set_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta -> unit (** Wait for a message from a given connection. *) @@ -193,7 +198,7 @@ module RPC : sig val stat : ('msg, 'meta) net -> Stat.t - module Event = P2p_connection_pool.LogEvent + module Event = P2p_connection_pool.Log_event val watch : ('msg, 'meta) net -> Event.t Lwt_stream.t * Watcher.stopper val connect : ('msg, 'meta) net -> Point.t -> float -> unit tzresult Lwt.t @@ -301,6 +306,8 @@ module Raw : sig type 'a t = | Bootstrap | Advertise of P2p_types.Point.t list + | Swap_request of Point.t * Peer_id.t + | Swap_ack of Point.t * Peer_id.t | Message of 'a | Disconnect val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 5541a6afa..bff35c40e 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -206,11 +206,16 @@ let authenticate return (info, (fd, info, cryptobox_data)) type connection = { + id : int ; info : Connection_info.t ; fd : P2p_io_scheduler.connection ; cryptobox_data : Crypto.data ; } +let next_conn_id = + let cpt = ref 0 in + fun () -> incr cpt ;!cpt + module Reader = struct type 'msg t = { @@ -349,6 +354,9 @@ type 'msg t = { writer : 'msg Writer.t ; } +let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 + + let pp ppf { conn } = Connection_info.pp ppf conn.info let info { conn } = conn.info @@ -367,7 +375,7 @@ let accept end >>=? fun accepted -> fail_unless accepted Rejected >>=? fun () -> let canceler = Canceler.create () in - let conn = { fd ; info ; cryptobox_data } in + let conn = { id = next_conn_id (); fd ; info ; cryptobox_data } in let reader = Reader.run ?size:incoming_message_queue_size conn encoding canceler and writer = diff --git a/src/node/net/p2p_connection.mli b/src/node/net/p2p_connection.mli index 538ffd440..9b860273b 100644 --- a/src/node/net/p2p_connection.mli +++ b/src/node/net/p2p_connection.mli @@ -36,6 +36,8 @@ type 'msg t (** Type of an accepted connection, parametrized by the type of messages exchanged between peers. *) +val equal: 'mst t -> 'msg t -> bool + val pp : Format.formatter -> 'msg t -> unit val info: 'msg t -> Connection_info.t diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 0608b88b9..450f6e026 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -33,6 +33,8 @@ module Message = struct type 'msg t = | Bootstrap | Advertise of Point.t list + | Swap_request of Point.t * Peer_id.t + | Swap_ack of Point.t * Peer_id.t | Message of 'msg | Disconnect @@ -48,6 +50,16 @@ module Message = struct case ~tag:0x03 (Variable.list Point.encoding) (function Advertise points -> Some points | _ -> None) (fun points -> Advertise points); + case ~tag:0x04 (tup2 Point.encoding Peer_id.encoding) + (function + | Swap_request (point, peer_id) -> Some (point, peer_id) + | _ -> None) + (fun (point, peer_id) -> Swap_request (point, peer_id)) ; + case ~tag:0x05 (tup2 Point.encoding Peer_id.encoding) + (function + | Swap_ack (point, peer_id) -> Some (point, peer_id) + | _ -> None) + (fun (point, peer_id) -> Swap_ack (point, peer_id)) ; ] @ ListLabels.map msg_encoding ~f:(function Encoding { tag ; encoding ; wrap ; unwrap } -> @@ -64,6 +76,8 @@ module Answerer = struct bootstrap: unit -> Point.t list Lwt.t ; advertise: Point.t list -> unit Lwt.t ; message: int -> 'msg -> unit Lwt.t ; + swap_request: Point.t -> Peer_id.t -> unit Lwt.t ; + swap_ack: Point.t -> Peer_id.t -> unit Lwt.t ; } type 'msg t = { @@ -94,6 +108,12 @@ module Answerer = struct | Ok (_, Advertise points) -> st.callback.advertise points >>= fun () -> worker_loop st + | Ok (_, Swap_request (point, peer)) -> + st.callback.swap_request point peer >>= fun () -> + worker_loop st + | Ok (_, Swap_ack (point, peer)) -> + st.callback.swap_ack point peer >>= fun () -> + worker_loop st | Ok (size, Message msg) -> st.callback.message size msg >>= fun () -> worker_loop st @@ -125,12 +145,19 @@ module Answerer = struct end -module LogEvent = struct +module Log_event = struct + type t = + | Too_few_connections | Too_many_connections + | New_point of Point.t | New_peer of Peer_id.t + + | Gc_points + | Gc_peer_ids + | Incoming_connection of Point.t | Outgoing_connection of Point.t | Authentication_failed of Point.t @@ -138,12 +165,18 @@ module LogEvent = struct | Rejecting_request of Point.t * Id_point.t * Peer_id.t | Request_rejected of Point.t * (Id_point.t * Peer_id.t) option | Connection_established of Id_point.t * Peer_id.t + + | Swap_request_received of { source : Peer_id.t } + | Swap_ack_received of { source : Peer_id.t } + | Swap_request_sent of { source : Peer_id.t } + | Swap_ack_sent of { source : Peer_id.t } + | Swap_request_ignored of { source : Peer_id.t } + | Swap_success of { source : Peer_id.t } + | Swap_failure of { source : Peer_id.t } + | Disconnection of Peer_id.t | External_disconnection of Peer_id.t - | Gc_points - | Gc_peer_ids - let encoding = let open Data_encoding in let branch_encoding name obj = @@ -182,26 +215,30 @@ module LogEvent = struct (req "point" Point.encoding) (req "id_point" Id_point.encoding) (req "peer_id" Peer_id.encoding))) - (function Accepting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None) + (function Accepting_request (p, id_p, g) -> + Some (p, id_p, g) | _ -> None) (fun (p, id_p, g) -> Accepting_request (p, id_p, g)) ; case ~tag:8 (branch_encoding "rejecting_request" (obj3 (req "point" Point.encoding) (req "id_point" Id_point.encoding) (req "peer_id" Peer_id.encoding))) - (function Rejecting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None) + (function Rejecting_request (p, id_p, g) -> + Some (p, id_p, g) | _ -> None) (fun (p, id_p, g) -> Rejecting_request (p, id_p, g)) ; case ~tag:9 (branch_encoding "request_rejected" (obj2 (req "point" Point.encoding) - (opt "identity" (tup2 Id_point.encoding Peer_id.encoding)))) + (opt "identity" + (tup2 Id_point.encoding Peer_id.encoding)))) (function Request_rejected (p, id) -> Some (p, id) | _ -> None) (fun (p, id) -> Request_rejected (p, id)) ; case ~tag:10 (branch_encoding "connection_established" (obj2 (req "id_point" Id_point.encoding) (req "peer_id" Peer_id.encoding))) - (function Connection_established (id_p, g) -> Some (id_p, g) | _ -> None) + (function Connection_established (id_p, g) -> + Some (id_p, g) | _ -> None) (fun (id_p, g) -> Connection_established (id_p, g)) ; case ~tag:11 (branch_encoding "disconnection" (obj1 (req "peer_id" Peer_id.encoding))) @@ -217,30 +254,50 @@ module LogEvent = struct case ~tag:14 (branch_encoding "gc_peer_ids" empty) (function Gc_peer_ids -> Some () | _ -> None) (fun () -> Gc_peer_ids) ; + case ~tag:15 (branch_encoding "swap_request_received" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_request_received { source } -> Some source + | _ -> None) + (fun source -> Swap_request_received { source }) ; + case ~tag:16 (branch_encoding "swap_ack_received" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_ack_received { source } -> Some source + | _ -> None) + (fun source -> Swap_ack_received { source }) ; + case ~tag:17 (branch_encoding "swap_request_sent" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_request_sent { source } -> Some source + | _ -> None) + (fun source -> Swap_request_sent { source }) ; + case ~tag:18 (branch_encoding "swap_ack_sent" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_ack_sent { source } -> Some source + | _ -> None) + (fun source -> Swap_ack_sent { source }) ; + case ~tag:19 (branch_encoding "swap_request_ignored" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_request_ignored { source } -> Some source + | _ -> None) + (fun source -> Swap_request_ignored { source }) ; + case ~tag:20 (branch_encoding "swap_success" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_success { source } -> Some source + | _ -> None) + (fun source -> Swap_success { source }) ; + case ~tag:21 (branch_encoding "swap_failure" + (obj1 (req "source" Peer_id.encoding))) + (function + | Swap_failure { source } -> Some source + | _ -> None) + (fun source -> Swap_failure { source }) ; ] - let log watcher event = Watcher.notify watcher event - - let too_few_connections watcher = log watcher Too_few_connections - let too_many_connections watcher = log watcher Too_many_connections - let new_point watcher ~point = log watcher (New_point point) - let new_peer watcher ~peer_id = log watcher (New_peer peer_id) - let incoming_connection watcher ~point = log watcher (Incoming_connection point) - let outgoing_connection watcher ~point = log watcher (Outgoing_connection point) - let authentication_failed watcher ~point = log watcher (Authentication_failed point) - let accepting_request watcher ~id_point ~point ~peer_id = - log watcher (Accepting_request (point, id_point, peer_id)) - let rejecting_request watcher ~id_point ~point ~peer_id = - log watcher (Rejecting_request (point, id_point, peer_id)) - let request_rejected watcher ?credentials ~point = - log watcher (Request_rejected (point, credentials)) - let connection_established watcher ~id_point ~peer_id = - log watcher (Connection_established (id_point, peer_id)) - let disconnection watcher ~is_external ~peer_id = - log watcher (if is_external then External_disconnection peer_id - else Disconnection peer_id) - let gc_points watcher = log watcher Gc_points - let gc_peer_ids watcher = log watcher Gc_peer_ids end type config = { @@ -266,6 +323,8 @@ type config = { known_points_history_size : int ; max_known_points : (int * int) option ; (* max, gc target *) max_known_peer_ids : (int * int) option ; (* max, gc target *) + + swap_linger : float ; } type 'meta meta_config = { @@ -284,20 +343,23 @@ type ('msg, 'meta) t = { meta_config : 'meta meta_config ; message_config : 'msg message_config ; my_id_points : unit Point.Table.t ; - known_peer_ids : (('msg, 'meta) connection, 'meta) Peer_info.t Peer_id.Table.t ; - connected_peer_ids : (('msg, 'meta) connection, 'meta) Peer_info.t Peer_id.Table.t ; + known_peer_ids : + (('msg, 'meta) connection, 'meta) Peer_info.t Peer_id.Table.t ; + connected_peer_ids : + (('msg, 'meta) connection, 'meta) Peer_info.t Peer_id.Table.t ; known_points : ('msg, 'meta) connection Point_info.t Point.Table.t ; connected_points : ('msg, 'meta) connection Point_info.t Point.Table.t ; incoming : Canceler.t Point.Table.t ; io_sched : P2p_io_scheduler.t ; encoding : 'msg Message.t Data_encoding.t ; events : events ; - watcher : LogEvent.t Watcher.input ; + watcher : Log_event.t Watcher.input ; mutable new_connection_hook : (Peer_id.t -> ('msg, 'meta) connection -> unit) list ; + mutable latest_accepted_swap : Time.t ; + mutable latest_succesfull_swap : Time.t ; } - and events = { too_few_connections : unit Lwt_condition.t ; too_many_connections : unit Lwt_condition.t ; @@ -311,13 +373,14 @@ and ('msg, 'meta) connection = { conn : 'msg Message.t P2p_connection.t ; peer_info : (('msg, 'meta) connection, 'meta) Peer_info.t ; point_info : ('msg, 'meta) connection Point_info.t option ; - answerer : 'msg Answerer.t ; + answerer : 'msg Answerer.t Lazy.t ; + mutable last_sent_swap_request : (Time.t * Peer_id.t) option ; mutable wait_close : bool ; } type ('msg, 'meta) pool = ('msg, 'meta) t -module PoolEvent = struct +module Pool_event = struct let wait_too_few_connections pool = Lwt_condition.wait pool.events.too_few_connections let wait_too_many_connections pool = @@ -329,6 +392,7 @@ module PoolEvent = struct end let watch { watcher } = Watcher.create_stream watcher +let log { watcher } event = Watcher.notify watcher event module GcPointSet = Utils.Bounded(struct type t = Time.t * Point.t @@ -341,10 +405,10 @@ let gc_points ({ config = { max_known_points } ; known_points } as pool) = | Some (_, target) -> let now = Time.now () in (* TODO: maybe time of discovery? *) let table = GcPointSet.create target in - Point.Table.iter (fun p pi -> - if Point_info.State.is_disconnected pi then + Point.Table.iter (fun p point_info -> + if Point_info.State.is_disconnected point_info then let time = - match Point_info.last_miss pi with + match Point_info.last_miss point_info with | None -> now | Some t -> t in GcPointSet.insert (time, p) table @@ -353,19 +417,25 @@ let gc_points ({ config = { max_known_points } ; known_points } as pool) = ListLabels.iter to_remove ~f:begin fun (_, p) -> Point.Table.remove known_points p end ; - LogEvent.gc_points pool.watcher + log pool Gc_points -let register_point pool ?trusted (addr, port as point) = +let register_point pool ?trusted _source_peer_id (addr, port as point) = match Point.Table.find pool.known_points point with | exception Not_found -> - let pi = Point_info.create ?trusted addr port in + let point_info = Point_info.create ?trusted addr port in iter_option pool.config.max_known_points ~f:begin fun (max, _) -> if Point.Table.length pool.known_points >= max then gc_points pool end ; - Point.Table.add pool.known_points point pi ; - LogEvent.new_point pool.watcher point ; - pi - | pi -> pi + Point.Table.add pool.known_points point point_info ; + log pool (New_point point) ; + point_info + | point_info -> point_info + +let may_register_my_id_point pool = function + | [P2p_connection.Myself (addr, Some port)] -> + Point.Table.add pool.my_id_points (addr, port) () ; + Point.Table.remove pool.known_points (addr, port) + | _ -> () (* Bounded table used to garbage collect peer_id infos when needed. The @@ -398,7 +468,7 @@ let gc_peer_ids ({ meta_config = { score } ; ListLabels.iter to_remove ~f:begin fun (_, _, peer_id) -> Peer_id.Table.remove known_peer_ids peer_id end ; - LogEvent.gc_peer_ids pool.watcher + log pool Gc_peer_ids let register_peer pool peer_id = match Peer_id.Table.find pool.known_peer_ids peer_id with @@ -409,299 +479,10 @@ let register_peer pool peer_id = if Peer_id.Table.length pool.known_peer_ids >= max then gc_peer_ids pool end ; Peer_id.Table.add pool.known_peer_ids peer_id peer ; - LogEvent.new_peer pool.watcher peer_id ; + log pool (New_peer peer_id) ; peer | peer -> peer -let register_new_point pool _peer_id point = - if not (Point.Table.mem pool.my_id_points point) then - ignore (register_point pool point) - -let register_new_points pool peer_id points = - List.iter (register_new_point pool peer_id) points ; - Lwt.return_unit - -let compare_known_point_info p1 p2 = - (* The most-recently disconnected peers are greater. *) - (* Then come long-standing connected peers. *) - let disconnected1 = Point_info.State.is_disconnected p1 - and disconnected2 = Point_info.State.is_disconnected p2 in - let compare_last_seen p1 p2 = - match Point_info.last_seen p1, Point_info.last_seen p2 with - | None, None -> Random.int 2 * 2 - 1 (* HACK... *) - | Some _, None -> 1 - | None, Some _ -> -1 - | Some (_, time1), Some (_, time2) -> - match compare time1 time2 with - | 0 -> Random.int 2 * 2 - 1 (* HACK... *) - | x -> x in - match disconnected1, disconnected2 with - | false, false -> compare_last_seen p1 p2 - | false, true -> -1 - | true, false -> 1 - | true, true -> compare_last_seen p2 p1 - -let list_known_points pool _peer_id () = - let knowns = - Point.Table.fold (fun _ pi acc -> pi :: acc) pool.known_points [] in - let best_knowns = - Utils.take_n ~compare:compare_known_point_info 50 knowns in - Lwt.return (List.map Point_info.point best_knowns) - -let active_connections pool = Peer_id.Table.length pool.connected_peer_ids - -let create_connection pool conn id_point pi gi _version = - let peer_id = Peer_info.peer_id gi in - let canceler = Canceler.create () in - let size = - map_option pool.config.incoming_app_message_queue_size - ~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in - let messages = Lwt_pipe.create ?size () in - let callback = - { Answerer.message = - (fun size msg -> Lwt_pipe.push messages (size, msg)) ; - advertise = register_new_points pool peer_id ; - bootstrap = list_known_points pool peer_id ; - } in - let answerer = Answerer.run conn canceler callback in - let conn = - { conn ; point_info = pi ; peer_info = gi ; - messages ; canceler ; answerer ; wait_close = false } in - iter_option pi ~f:begin fun pi -> - let point = Point_info.point pi in - Point_info.State.set_running pi peer_id conn ; - Point.Table.add pool.connected_points point pi ; - end ; - LogEvent.connection_established pool.watcher ~id_point ~peer_id ; - Peer_info.State.set_running gi id_point conn ; - Peer_id.Table.add pool.connected_peer_ids peer_id gi ; - Lwt_condition.broadcast pool.events.new_connection () ; - Canceler.on_cancel canceler begin fun () -> - lwt_debug "Disconnect: %a (%a)" - Peer_id.pp peer_id Id_point.pp id_point >>= fun () -> - iter_option ~f:Point_info.State.set_disconnected pi; - LogEvent.disconnection pool.watcher ~is_external:false ~peer_id ; - Peer_info.State.set_disconnected gi ; - iter_option pi ~f:begin fun pi -> - Point.Table.remove pool.connected_points (Point_info.point pi) ; - end ; - Peer_id.Table.remove pool.connected_peer_ids peer_id ; - if pool.config.max_connections <= active_connections pool then begin - Lwt_condition.broadcast pool.events.too_many_connections () ; - LogEvent.too_many_connections pool.watcher ; - end ; - P2p_connection.close ~wait:conn.wait_close conn.conn - end ; - List.iter (fun f -> f peer_id conn) pool.new_connection_hook ; - if active_connections pool < pool.config.min_connections then begin - Lwt_condition.broadcast pool.events.too_few_connections () ; - LogEvent.too_few_connections pool.watcher ; - end ; - conn - -let disconnect ?(wait = false) conn = - conn.wait_close <- wait ; - Canceler.cancel conn.canceler >>= fun () -> - conn.answerer.worker - -type error += Rejected of Peer_id.t -type error += Unexpected_point_state -type error += Unexpected_peer_id_state - -let may_register_my_id_point pool = function - | [P2p_connection.Myself (addr, Some port)] -> - Point.Table.add pool.my_id_points (addr, port) () ; - Point.Table.remove pool.known_points (addr, port) - | _ -> () - -let authenticate pool ?pi canceler fd point = - let incoming = pi = None in - lwt_debug "authenticate: %a%s" - Point.pp point - (if incoming then " incoming" else "") >>= fun () -> - Lwt_utils.protect ~canceler begin fun () -> - P2p_connection.authenticate - ~proof_of_work_target:pool.config.proof_of_work_target - ~incoming (P2p_io_scheduler.register pool.io_sched fd) point - ?listening_port:pool.config.listening_port - pool.config.identity pool.message_config.versions - end ~on_error: begin fun err -> - (* Authentication incorrect! *) - (* TODO do something when the error is Not_enough_proof_of_work ?? *) - lwt_debug "@[authenticate: %a%s -> failed@ %a@]" - Point.pp point - (if incoming then " incoming" else "") - pp_print_error err >>= fun () -> - may_register_my_id_point pool err ; - LogEvent.authentication_failed pool.watcher ~point ; - if incoming then - Point.Table.remove pool.incoming point - else - iter_option Point_info.State.set_disconnected pi ; - Lwt.return (Error err) - end >>=? fun (info, auth_fd) -> - (* Authentication correct! *) - lwt_debug "authenticate: %a -> auth %a" - Point.pp point - Connection_info.pp info >>= fun () -> - let remote_pi = - match info.id_point with - | addr, Some port - when not (Point.Table.mem pool.my_id_points (addr, port)) -> - Some (register_point pool (addr, port)) - | _ -> None in - let connection_pi = - match pi, remote_pi with - | None, None -> None - | Some _ as pi, _ | _, (Some _ as pi) -> pi in - let gi = register_peer pool info.peer_id in - let acceptable_versions = - Version.common info.versions pool.message_config.versions - in - let acceptable_point = - unopt_map connection_pi - ~default:(not pool.config.closed_network) - ~f:begin fun connection_pi -> - match Point_info.State.get connection_pi with - | Requested _ -> not incoming - | Disconnected -> - not pool.config.closed_network - || Point_info.trusted connection_pi - | Accepted _ | Running _ -> false - end - in - let acceptable_peer_id = - match Peer_info.State.get gi with - | Accepted _ -> - (* TODO: in some circumstances cancel and accept... *) - false - | Running _ -> false - | Disconnected -> true - in - if incoming then - Point.Table.remove pool.incoming point ; - match acceptable_versions with - | Some version when acceptable_peer_id && acceptable_point -> begin - LogEvent.accepting_request pool.watcher - ~id_point:info.id_point ~point ~peer_id:info.peer_id ; - iter_option connection_pi - ~f:(fun pi -> Point_info.State.set_accepted pi info.peer_id canceler) ; - Peer_info.State.set_accepted gi info.id_point canceler ; - lwt_debug "authenticate: %a -> accept %a" - Point.pp point - Connection_info.pp info >>= fun () -> - Lwt_utils.protect ~canceler begin fun () -> - P2p_connection.accept - ?incoming_message_queue_size:pool.config.incoming_message_queue_size - ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size - auth_fd pool.encoding >>= fun conn -> - lwt_debug "authenticate: %a -> Connected %a" - Point.pp point - Connection_info.pp info >>= fun () -> - Lwt.return conn - end ~on_error: begin fun err -> - if incoming then - LogEvent.request_rejected pool.watcher - ~credentials:(info.id_point, info.peer_id) ~point ; - lwt_debug "authenticate: %a -> rejected %a" - Point.pp point - Connection_info.pp info >>= fun () -> - iter_option connection_pi ~f:Point_info.State.set_disconnected; - Peer_info.State.set_disconnected gi ; - Lwt.return (Error err) - end >>=? fun conn -> - let id_point = - match info.id_point, map_option Point_info.point pi with - | (addr, _), Some (_, port) -> addr, Some port - | id_point, None -> id_point in - return (create_connection pool conn id_point connection_pi gi version) - end - | _ -> begin - LogEvent.rejecting_request pool.watcher - ~id_point:info.id_point ~point ~peer_id:info.peer_id ; - lwt_debug "authenticate: %a -> kick %a point: %B peer_id: %B" - Point.pp point - Connection_info.pp info - acceptable_point acceptable_peer_id >>= fun () -> - P2p_connection.kick auth_fd >>= fun () -> - if not incoming then begin - iter_option ~f:Point_info.State.set_disconnected pi ; - (* FIXME Peer_info.State.set_disconnected ~requested:true gi ; *) - end ; - fail (Rejected info.peer_id) - end - -type error += Pending_connection -type error += Connected -type error += Connection_closed = P2p_io_scheduler.Connection_closed -type error += Connection_refused -type error += Closed_network - -let fail_unless_disconnected_point pi = - match Point_info.State.get pi with - | Disconnected -> return () - | Requested _ | Accepted _ -> fail Pending_connection - | Running _ -> fail Connected - -let fail_unless_disconnected_peer_id gi = - match Peer_info.State.get gi with - | Disconnected -> return () - | Accepted _ -> fail Pending_connection - | Running _ -> fail Connected - -let raw_connect canceler pool point = - let pi = register_point pool point in - let addr, port as point = Point_info.point pi in - fail_unless - (not pool.config.closed_network || Point_info.trusted pi) - Closed_network >>=? fun () -> - fail_unless_disconnected_point pi >>=? fun () -> - Point_info.State.set_requested pi canceler ; - let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in - let uaddr = - Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in - lwt_debug "connect: %a" Point.pp point >>= fun () -> - Lwt_utils.protect ~canceler begin fun () -> - LogEvent.outgoing_connection pool.watcher ~point ; - Lwt_unix.connect fd uaddr >>= fun () -> - return () - end ~on_error: begin fun err -> - lwt_debug "connect: %a -> disconnect" Point.pp point >>= fun () -> - Point_info.State.set_disconnected pi ; - Lwt_utils.safe_close fd >>= fun () -> - match err with - | [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] -> - fail Connection_refused - | err -> Lwt.return (Error err) - end >>=? fun () -> - lwt_debug "connect: %a -> authenticate" Point.pp point >>= fun () -> - authenticate pool ~pi canceler fd point - -type error += Too_many_connections - -let connect ~timeout pool point = - fail_unless - (active_connections pool <= pool.config.max_connections) - Too_many_connections >>=? fun () -> - let canceler = Canceler.create () in - Lwt_utils.with_timeout ~canceler timeout begin fun canceler -> - raw_connect canceler pool point - end - -let accept pool fd point = - LogEvent.incoming_connection pool.watcher ~point ; - if pool.config.max_incoming_connections <= Point.Table.length pool.incoming - || pool.config.max_connections <= active_connections pool then - Lwt.async (fun () -> Lwt_utils.safe_close fd) - else - let canceler = Canceler.create () in - Point.Table.add pool.incoming point canceler ; - Lwt.async begin fun () -> - Lwt_utils.with_timeout - ~canceler pool.config.authentification_timeout - (fun canceler -> authenticate pool canceler fd point) - end - (***************************************************************************) @@ -729,8 +510,8 @@ let write_now { conn } msg = let write_all pool msg = Peer_id.Table.iter - (fun _peer_id gi -> - match Peer_info.State.get gi with + (fun _peer_id peer_info -> + match Peer_info.State.get peer_info with | Running { data = conn } -> ignore (write_now conn msg : bool tzresult ) | _ -> ()) @@ -738,8 +519,8 @@ let write_all pool msg = let broadcast_bootstrap_msg pool = Peer_id.Table.iter - (fun _peer_id gi -> - match Peer_info.State.get gi with + (fun _peer_id peer_info -> + match Peer_info.State.get peer_info with | Running { data = { conn } } -> ignore (P2p_connection.write_now conn Bootstrap : bool tzresult ) | _ -> ()) @@ -757,12 +538,11 @@ module Peer_ids = struct with Not_found -> None let get_metadata pool peer_id = - try Some (Peer_info.metadata (Peer_id.Table.find pool.known_peer_ids peer_id)) - with Not_found -> None + try Peer_info.metadata (Peer_id.Table.find pool.known_peer_ids peer_id) + with Not_found -> pool.meta_config.initial let get_score pool peer_id = - try Some (pool.meta_config.score @@ Peer_info.metadata (Peer_id.Table.find pool.known_peer_ids peer_id)) - with Not_found -> None + pool.meta_config.score (get_metadata pool peer_id) let set_metadata pool peer_id data = Peer_info.set_metadata (register_peer pool peer_id) data @@ -779,14 +559,6 @@ module Peer_ids = struct try Peer_info.unset_trusted (Peer_id.Table.find pool.known_peer_ids peer_id) with Not_found -> () - let find_connection pool peer_id = - apply_option - (info pool peer_id) - ~f:(fun p -> - match Peer_info.State.get p with - | Running { data } -> Some data - | _ -> None) - let fold_known pool ~init ~f = Peer_id.Table.fold f pool.known_peer_ids init @@ -795,13 +567,6 @@ module Peer_ids = struct end -let fold_connections pool ~init ~f = - Peer_ids.fold_connected pool ~init ~f:begin fun peer_id gi acc -> - match Peer_info.State.get gi with - | Running { data } -> f peer_id data acc - | _ -> acc - end - module Points = struct type ('msg, 'meta) info = ('msg, 'meta) connection Point_info.t @@ -810,44 +575,481 @@ module Points = struct try Some (Point.Table.find known_points point) with Not_found -> None - let get_trusted pool peer_id = - try Point_info.trusted (Point.Table.find pool.known_points peer_id) + let get_trusted pool point = + try Point_info.trusted (Point.Table.find pool.known_points point) with Not_found -> false - let set_trusted pool peer_id = - try Point_info.set_trusted (register_point pool peer_id) + let set_trusted pool point = + try + Point_info.set_trusted + (register_point pool pool.config.identity.peer_id point) with Not_found -> () let unset_trusted pool peer_id = try Point_info.unset_trusted (Point.Table.find pool.known_points peer_id) with Not_found -> () - let find_connection pool point = - apply_option - (info pool point) - ~f:(fun p -> - match Point_info.State.get p with - | Running { data } -> Some data - | _ -> None) - let fold_known pool ~init ~f = - Point.Table.fold f pool.known_points init + Point.Table.fold f pool.known_points init let fold_connected pool ~init ~f = Point.Table.fold f pool.connected_points init end -let connection_stat { conn } = - P2p_connection.stat conn +module Connection = struct + + let fold pool ~init ~f = + Peer_ids.fold_connected pool ~init ~f:begin fun peer_id peer_info acc -> + match Peer_info.State.get peer_info with + | Running { data } -> f peer_id data acc + | _ -> acc + end + + let list pool = + fold pool ~init:[] ~f:(fun peer_id c acc -> (peer_id, c) :: acc) + + let random ?different_than pool = + let candidates = + fold pool ~init:[] ~f:begin fun _peer conn acc -> + match different_than with + | Some excluded_conn + when P2p_connection.equal conn.conn excluded_conn.conn -> acc + | Some _ | None -> conn :: acc + end in + match candidates with + | [] -> None + | _ :: _ -> + Some (List.nth candidates (Random.int @@ List.length candidates)) + + let random_lowid ?different_than pool = + let candidates = + fold pool ~init:[] ~f:begin fun _peer conn acc -> + match different_than with + | Some excluded_conn + when P2p_connection.equal conn.conn excluded_conn.conn -> acc + | Some _ | None -> + let ci = P2p_connection.info conn.conn in + match ci.id_point with + | _, None -> acc + | addr, Some port -> ((addr, port), ci.peer_id, conn) :: acc + end in + match candidates with + | [] -> None + | _ :: _ -> + Some (List.nth candidates (Random.int @@ List.length candidates)) + + let stat { conn } = + P2p_connection.stat conn + + let score { meta_config = { score }} meta = score meta + + let info { conn } = + P2p_connection.info conn + + let find_by_peer_id pool peer_id = + apply_option + (Peer_ids.info pool peer_id) + ~f:(fun p -> + match Peer_info.State.get p with + | Running { data } -> Some data + | _ -> None) + + let find_by_point pool point = + apply_option + (Points.info pool point) + ~f:(fun p -> + match Point_info.State.get p with + | Running { data } -> Some data + | _ -> None) + +end let pool_stat { io_sched } = P2p_io_scheduler.global_stat io_sched -let score { meta_config = { score }} meta = score meta -let connection_info { conn } = - P2p_connection.info conn +(***************************************************************************) + +type error += Rejected of Peer_id.t +type error += Unexpected_point_state +type error += Unexpected_peer_id_state +type error += Pending_connection +type error += Connected +type error += Connection_closed = P2p_io_scheduler.Connection_closed +type error += Connection_refused +type error += Closed_network +type error += Too_many_connections + +let fail_unless_disconnected_point point_info = + match Point_info.State.get point_info with + | Disconnected -> return () + | Requested _ | Accepted _ -> fail Pending_connection + | Running _ -> fail Connected + +let fail_unless_disconnected_peer_id peer_info = + match Peer_info.State.get peer_info with + | Disconnected -> return () + | Accepted _ -> fail Pending_connection + | Running _ -> fail Connected + +let compare_known_point_info p1 p2 = + (* The most-recently disconnected peers are greater. *) + (* Then come long-standing connected peers. *) + let disconnected1 = Point_info.State.is_disconnected p1 + and disconnected2 = Point_info.State.is_disconnected p2 in + let compare_last_seen p1 p2 = + match Point_info.last_seen p1, Point_info.last_seen p2 with + | None, None -> Random.int 2 * 2 - 1 (* HACK... *) + | Some _, None -> 1 + | None, Some _ -> -1 + | Some (_, time1), Some (_, time2) -> + match compare time1 time2 with + | 0 -> Random.int 2 * 2 - 1 (* HACK... *) + | x -> x in + match disconnected1, disconnected2 with + | false, false -> compare_last_seen p1 p2 + | false, true -> -1 + | true, false -> 1 + | true, true -> compare_last_seen p2 p1 + +let rec connect ~timeout pool point = + fail_unless + (active_connections pool <= pool.config.max_connections) + Too_many_connections >>=? fun () -> + let canceler = Canceler.create () in + Lwt_utils.with_timeout ~canceler timeout begin fun canceler -> + let point_info = + register_point pool pool.config.identity.peer_id point in + let addr, port as point = Point_info.point point_info in + fail_unless + (not pool.config.closed_network || Point_info.trusted point_info) + Closed_network >>=? fun () -> + fail_unless_disconnected_point point_info >>=? fun () -> + Point_info.State.set_requested point_info canceler ; + let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let uaddr = + Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in + lwt_debug "connect: %a" Point.pp point >>= fun () -> + Lwt_utils.protect ~canceler begin fun () -> + log pool (Outgoing_connection point) ; + Lwt_unix.connect fd uaddr >>= fun () -> + return () + end ~on_error: begin fun err -> + lwt_debug "connect: %a -> disconnect" Point.pp point >>= fun () -> + Point_info.State.set_disconnected point_info ; + Lwt_utils.safe_close fd >>= fun () -> + match err with + | [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] -> + fail Connection_refused + | err -> Lwt.return (Error err) + end >>=? fun () -> + lwt_debug "connect: %a -> authenticate" Point.pp point >>= fun () -> + authenticate pool ~point_info canceler fd point + end + +and authenticate pool ?point_info canceler fd point = + let incoming = point_info = None in + lwt_debug "authenticate: %a%s" + Point.pp point + (if incoming then " incoming" else "") >>= fun () -> + Lwt_utils.protect ~canceler begin fun () -> + P2p_connection.authenticate + ~proof_of_work_target:pool.config.proof_of_work_target + ~incoming (P2p_io_scheduler.register pool.io_sched fd) point + ?listening_port:pool.config.listening_port + pool.config.identity pool.message_config.versions + end ~on_error: begin fun err -> + (* Authentication incorrect! *) + (* TODO do something when the error is Not_enough_proof_of_work ?? *) + lwt_debug "@[authenticate: %a%s -> failed@ %a@]" + Point.pp point + (if incoming then " incoming" else "") + pp_print_error err >>= fun () -> + may_register_my_id_point pool err ; + log pool (Authentication_failed point) ; + if incoming then + Point.Table.remove pool.incoming point + else + iter_option Point_info.State.set_disconnected point_info ; + Lwt.return (Error err) + end >>=? fun (info, auth_fd) -> + (* Authentication correct! *) + lwt_debug "authenticate: %a -> auth %a" + Point.pp point + Connection_info.pp info >>= fun () -> + let remote_point_info = + match info.id_point with + | addr, Some port + when not (Point.Table.mem pool.my_id_points (addr, port)) -> + Some (register_point pool info.peer_id (addr, port)) + | _ -> None in + let connection_point_info = + match point_info, remote_point_info with + | None, None -> None + | Some _ as point_info, _ | _, (Some _ as point_info) -> point_info in + let peer_info = register_peer pool info.peer_id in + let acceptable_versions = + Version.common info.versions pool.message_config.versions + in + let acceptable_point = + unopt_map connection_point_info + ~default:(not pool.config.closed_network) + ~f:begin fun connection_point_info -> + match Point_info.State.get connection_point_info with + | Requested _ -> not incoming + | Disconnected -> + not pool.config.closed_network + || Point_info.trusted connection_point_info + | Accepted _ | Running _ -> false + end + in + let acceptable_peer_id = + match Peer_info.State.get peer_info with + | Accepted _ -> + (* TODO: in some circumstances cancel and accept... *) + false + | Running _ -> false + | Disconnected -> true + in + if incoming then + Point.Table.remove pool.incoming point ; + match acceptable_versions with + | Some version when acceptable_peer_id && acceptable_point -> begin + log pool (Accepting_request (point, info.id_point, info.peer_id)) ; + iter_option connection_point_info + ~f:(fun point_info -> + Point_info.State.set_accepted point_info info.peer_id canceler) ; + Peer_info.State.set_accepted peer_info info.id_point canceler ; + lwt_debug "authenticate: %a -> accept %a" + Point.pp point + Connection_info.pp info >>= fun () -> + Lwt_utils.protect ~canceler begin fun () -> + P2p_connection.accept + ?incoming_message_queue_size:pool.config.incoming_message_queue_size + ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size + auth_fd pool.encoding >>= fun conn -> + lwt_debug "authenticate: %a -> Connected %a" + Point.pp point + Connection_info.pp info >>= fun () -> + Lwt.return conn + end ~on_error: begin fun err -> + if incoming then + log pool + (Request_rejected (point, Some (info.id_point, info.peer_id))) ; + lwt_debug "authenticate: %a -> rejected %a" + Point.pp point + Connection_info.pp info >>= fun () -> + iter_option connection_point_info + ~f:Point_info.State.set_disconnected ; + Peer_info.State.set_disconnected peer_info ; + Lwt.return (Error err) + end >>=? fun conn -> + let id_point = + match info.id_point, map_option Point_info.point point_info with + | (addr, _), Some (_, port) -> addr, Some port + | id_point, None -> id_point in + return + (create_connection + pool conn + id_point connection_point_info peer_info version) + end + | _ -> begin + log pool (Rejecting_request (point, info.id_point, info.peer_id)) ; + lwt_debug "authenticate: %a -> kick %a point: %B peer_id: %B" + Point.pp point + Connection_info.pp info + acceptable_point acceptable_peer_id >>= fun () -> + P2p_connection.kick auth_fd >>= fun () -> + if not incoming then begin + iter_option ~f:Point_info.State.set_disconnected point_info ; + (* FIXME Peer_info.State.set_disconnected ~requested:true peer_info ; *) + end ; + fail (Rejected info.peer_id) + end + +and create_connection pool p2p_conn id_point point_info peer_info _version = + let peer_id = Peer_info.peer_id peer_info in + let canceler = Canceler.create () in + let size = + map_option pool.config.incoming_app_message_queue_size + ~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in + let messages = Lwt_pipe.create ?size () in + let rec callback = + { Answerer.message = + (fun size msg -> Lwt_pipe.push messages (size, msg)) ; + advertise = + (fun points -> register_new_points pool conn points ) ; + bootstrap = + (fun () -> list_known_points pool conn () ) ; + swap_request = + (fun point peer_id -> swap_request pool conn point peer_id ) ; + swap_ack = + (fun point peer_id -> swap_ack pool conn point peer_id ) ; + } + and answerer = lazy (Answerer.run p2p_conn canceler callback) + and conn = + { conn = p2p_conn ; point_info ; peer_info ; + messages ; canceler ; answerer ; wait_close = false ; + last_sent_swap_request = None } in + ignore (Lazy.force answerer) ; + iter_option point_info ~f:begin fun point_info -> + let point = Point_info.point point_info in + Point_info.State.set_running point_info peer_id conn ; + Point.Table.add pool.connected_points point point_info ; + end ; + log pool (Connection_established (id_point, peer_id)) ; + Peer_info.State.set_running peer_info id_point conn ; + Peer_id.Table.add pool.connected_peer_ids peer_id peer_info ; + Lwt_condition.broadcast pool.events.new_connection () ; + Canceler.on_cancel canceler begin fun () -> + lwt_debug "Disconnect: %a (%a)" + Peer_id.pp peer_id Id_point.pp id_point >>= fun () -> + iter_option ~f:Point_info.State.set_disconnected point_info ; + log pool (Disconnection peer_id) ; + Peer_info.State.set_disconnected peer_info ; + iter_option point_info ~f:begin fun point_info -> + Point.Table.remove pool.connected_points (Point_info.point point_info) ; + end ; + Peer_id.Table.remove pool.connected_peer_ids peer_id ; + if pool.config.max_connections <= active_connections pool then begin + Lwt_condition.broadcast pool.events.too_many_connections () ; + log pool Too_many_connections ; + end ; + P2p_connection.close ~wait:conn.wait_close conn.conn + end ; + List.iter (fun f -> f peer_id conn) pool.new_connection_hook ; + if active_connections pool < pool.config.min_connections then begin + Lwt_condition.broadcast pool.events.too_few_connections () ; + log pool Too_few_connections ; + end ; + conn + +and disconnect ?(wait = false) conn = + conn.wait_close <- wait ; + Answerer.shutdown (Lazy.force conn.answerer) + +and register_new_points pool conn = + let source_peer_id = Peer_info.peer_id conn.peer_info in + fun points -> + List.iter (register_new_point pool source_peer_id) points ; + Lwt.return_unit + +and register_new_point pool _source_peer_id point = + if not (Point.Table.mem pool.my_id_points point) then + ignore (register_point pool _source_peer_id point) + +and list_known_points pool _conn () = + let knowns = + Point.Table.fold + (fun _ point_info acc -> point_info :: acc) + pool.known_points [] in + let best_knowns = + Utils.take_n ~compare:compare_known_point_info 50 knowns in + Lwt.return (List.map Point_info.point best_knowns) + +and active_connections pool = Peer_id.Table.length pool.connected_peer_ids + +and swap_request pool conn new_point _new_peer_id = + let source_peer_id = Peer_info.peer_id conn.peer_info in + log pool (Swap_request_received { source = source_peer_id }) ; + lwt_log_info + "Swap request received from %a" Peer_id.pp source_peer_id >>= fun () -> + (* Ignore if already connected to peer or already swapped less + than seconds ago. *) + let now = Time.now () in + let span_since_last_swap = + Int64.to_int @@ + Time.diff now + (Time.max pool.latest_succesfull_swap pool.latest_accepted_swap) in + let new_point_info = register_point pool source_peer_id new_point in + if span_since_last_swap < int_of_float pool.config.swap_linger + || not (Point_info.State.is_disconnected new_point_info) then begin + log pool (Swap_request_ignored { source = source_peer_id }) ; + lwt_log_info "Ignoring swap request from %a" Peer_id.pp source_peer_id + end else begin + match Connection.random_lowid pool with + | None -> + lwt_log_info + "No swap candidate for %a" Peer_id.pp source_peer_id + | Some (proposed_point, proposed_peer_id, _proposed_conn) -> + match P2p_connection.write_now + conn.conn (Swap_ack (proposed_point, proposed_peer_id)) with + | Ok true -> + log pool (Swap_ack_sent { source = source_peer_id }) ; + swap pool conn proposed_peer_id new_point >>= fun () -> + Lwt.return_unit + | Ok false -> + log pool (Swap_request_received { source = source_peer_id }) ; + Lwt.return_unit + | Error _ -> + log pool (Swap_request_received { source = source_peer_id }) ; + Lwt.return_unit + end + +and swap_ack pool conn new_point _new_peer_id = + let source_peer_id = Peer_info.peer_id conn.peer_info in + log pool (Swap_ack_received { source = source_peer_id }) ; + lwt_log_info + "Swap ack received from %a" Peer_id.pp source_peer_id >>= fun () -> + match conn.last_sent_swap_request with + | None -> Lwt.return_unit (* ignore *) + | Some (_time, proposed_peer_id) -> + match Connection.find_by_peer_id pool proposed_peer_id with + | None -> + swap pool conn proposed_peer_id new_point >>= fun () -> + Lwt.return_unit + | Some _ -> + Lwt.return_unit + +and swap pool conn current_peer_id new_point = + let source_peer_id = Peer_info.peer_id conn.peer_info in + pool.latest_accepted_swap <- Time.now () ; + connect ~timeout:10. pool new_point >>= function + | Ok _new_conn -> begin + pool.latest_succesfull_swap <- Time.now () ; + log pool (Swap_success { source = source_peer_id }) ; + lwt_log_info "Swap to %a succeeded" Point.pp new_point >>= fun () -> + match Connection.find_by_peer_id pool current_peer_id with + | None -> Lwt.return_unit + | Some conn -> + disconnect conn >>= fun () -> + Lwt.return_unit + end + | Error err -> begin + pool.latest_accepted_swap <- pool.latest_succesfull_swap ; + log pool (Swap_failure { source = source_peer_id }) ; + lwt_log_error "Swap to %a failed: %a" + Point.pp new_point pp_print_error err + end + +let accept pool fd point = + log pool (Incoming_connection point) ; + if pool.config.max_incoming_connections <= Point.Table.length pool.incoming + || pool.config.max_connections <= active_connections pool then + Lwt.async (fun () -> Lwt_utils.safe_close fd) + else + let canceler = Canceler.create () in + Point.Table.add pool.incoming point canceler ; + Lwt.async begin fun () -> + Lwt_utils.with_timeout + ~canceler pool.config.authentification_timeout + (fun canceler -> authenticate pool canceler fd point) + end + +let send_swap_request pool = + match Connection.random pool with + | None -> () + | Some recipient -> + let recipient_peer_id = (Connection.info recipient).peer_id in + match Connection.random_lowid ~different_than:recipient pool with + | None -> () + | Some (proposed_point, proposed_peer_id, _proposed_conn) -> + log pool (Swap_request_sent { source = recipient_peer_id }) ; + recipient.last_sent_swap_request <- + Some (Time.now (), proposed_peer_id) ; + ignore (P2p_connection.write_now recipient.conn + (Swap_request (proposed_point, proposed_peer_id))) (***************************************************************************) @@ -871,12 +1073,16 @@ let create config meta_config message_config io_sched = events ; watcher = Watcher.create_input () ; new_connection_hook = [] ; + latest_accepted_swap = Time.epoch ; + latest_succesfull_swap = Time.epoch ; } in List.iter (Points.set_trusted pool) config.trusted_points ; Peer_info.File.load config.peers_file meta_config.encoding >>= function | Ok peer_ids -> List.iter - (fun gi -> Peer_id.Table.add pool.known_peer_ids (Peer_info.peer_id gi) gi) + (fun peer_info -> + let peer_id = Peer_info.peer_id peer_info in + Peer_id.Table.add pool.known_peer_ids peer_id peer_info) peer_ids ; Lwt.return pool | Error err -> @@ -885,16 +1091,16 @@ let create config meta_config message_config io_sched = Lwt.return pool let destroy pool = - Point.Table.fold (fun _point pi acc -> - match Point_info.State.get pi with + Point.Table.fold (fun _point point_info acc -> + match Point_info.State.get point_info with | Requested { cancel } | Accepted { cancel } -> Canceler.cancel cancel >>= fun () -> acc | Running { data = conn } -> disconnect conn >>= fun () -> acc | Disconnected -> acc) pool.known_points @@ - Peer_id.Table.fold (fun _peer_id gi acc -> - match Peer_info.State.get gi with + Peer_id.Table.fold (fun _peer_id peer_info acc -> + match Peer_info.State.get peer_info with | Accepted { cancel } -> Canceler.cancel cancel >>= fun () -> acc | Running { data = conn } -> diff --git a/src/node/net/p2p_connection_pool.mli b/src/node/net/p2p_connection_pool.mli index 4255aa977..43d13fafb 100644 --- a/src/node/net/p2p_connection_pool.mli +++ b/src/node/net/p2p_connection_pool.mli @@ -108,6 +108,11 @@ type config = { max_known_peer_ids : (int * int) option ; (** Like [max_known_points], but for known peer_ids. *) + + swap_linger : float ; + (** Peer swapping does not occur more than once during a timespan of + [spap_linger] seconds. *) + } type 'meta meta_config = { @@ -142,13 +147,12 @@ val pool_stat: ('msg, 'meta) pool -> Stat.t (** [pool_stat pool] is a snapshot of current bandwidth usage for the entire [pool]. *) -val score: ('msg, 'meta) pool -> 'meta -> float -(** [score pool meta] is the floating-point score of [meta] using - [pool]'s metrics. *) +val send_swap_request: ('msg, 'meta) pool -> unit (** {2 Pool events} *) -module PoolEvent : sig +module Pool_event : sig + val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t (** [wait_too_few_connections pool] is determined when the number of connections drops below the desired level. *) @@ -164,54 +168,9 @@ module PoolEvent : sig val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t (** [wait_new_connection pool] is determined when a new connection is succesfully established in the pool. *) + end -module LogEvent : sig - type t = - (** Pool-level events *) - - | Too_few_connections - | Too_many_connections - - | New_point of Point.t - | New_peer of Peer_id.t - - (** Connection-level events *) - - | Incoming_connection of Point.t - (** We accept(2)-ed an incoming connection *) - | Outgoing_connection of Point.t - (** We connect(2)-ed to a remote endpoint *) - | Authentication_failed of Point.t - (** Remote point failed authentication *) - - | Accepting_request of Point.t * Id_point.t * Peer_id.t - (** We accepted a connection after authentifying the remote peer. *) - | Rejecting_request of Point.t * Id_point.t * Peer_id.t - (** We rejected a connection after authentifying the remote peer. *) - | Request_rejected of Point.t * (Id_point.t * Peer_id.t) option - (** The remote peer rejected our connection. *) - - | Connection_established of Id_point.t * Peer_id.t - (** We succesfully established a authentified connection. *) - - | Disconnection of Peer_id.t - (** We decided to close the connection. *) - | External_disconnection of Peer_id.t - (** The connection was closed for external reason. *) - - | Gc_points - (** Garbage collection of known point table has been triggered. *) - | Gc_peer_ids - (** Garbage collection of known peer_ids table has been triggered. *) - - val encoding : t Data_encoding.t -end - -val watch: ('msg, 'meta) pool -> LogEvent.t Lwt_stream.t * Watcher.stopper -(** [watch pool] is a [stream, close] a [stream] of events and a - [close] function for this stream. *) - (** {1 Connections management} *) type ('msg, 'meta) connection @@ -245,17 +204,30 @@ val disconnect: (** [disconnect conn] cleanly closes [conn] and returns after [conn]'s internal worker has returned. *) -val connection_info: ('msg, 'meta) connection -> Connection_info.t +module Connection : sig -val connection_stat: ('msg, 'meta) connection -> Stat.t -(** [stat conn] is a snapshot of current bandwidth usage for - [conn]. *) + val info: ('msg, 'meta) connection -> Connection_info.t -val fold_connections: - ('msg, 'meta) pool -> - init:'a -> - f:(Peer_id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> - 'a + val stat: ('msg, 'meta) connection -> Stat.t + (** [stat conn] is a snapshot of current bandwidth usage for + [conn]. *) + + val fold: + ('msg, 'meta) pool -> + init:'a -> + f:(Peer_id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> + 'a + + val list: + ('msg, 'meta) pool -> (Peer_id.t * ('msg, 'meta) connection) list + + val find_by_point: + ('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option + + val find_by_peer_id: + ('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) connection option + +end val on_new_connection: ('msg, 'meta) pool -> @@ -304,17 +276,14 @@ module Peer_ids : sig val info: ('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) info option - val get_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta option + val get_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta val set_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta -> unit - val get_score: ('msg, 'meta) pool -> Peer_id.t -> float option + val get_score: ('msg, 'meta) pool -> Peer_id.t -> float val get_trusted: ('msg, 'meta) pool -> Peer_id.t -> bool val set_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit val unset_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit - val find_connection: - ('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) connection option - val fold_known: ('msg, 'meta) pool -> init:'a -> @@ -342,9 +311,6 @@ module Points : sig val set_trusted: ('msg, 'meta) pool -> Point.t -> unit val unset_trusted: ('msg, 'meta) pool -> Point.t -> unit - val find_connection: - ('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option - val fold_known: ('msg, 'meta) pool -> init:'a -> @@ -359,6 +325,70 @@ module Points : sig end +module Log_event : sig + + type t = + + (** Pool-level events *) + + | Too_few_connections + | Too_many_connections + + | New_point of Point.t + | New_peer of Peer_id.t + + | Gc_points + (** Garbage collection of known point table has been triggered. *) + | Gc_peer_ids + (** Garbage collection of known peer_ids table has been triggered. *) + + (** Connection-level events *) + + | Incoming_connection of Point.t + (** We accept(2)-ed an incoming connection *) + | Outgoing_connection of Point.t + (** We connect(2)-ed to a remote endpoint *) + | Authentication_failed of Point.t + (** Remote point failed authentication *) + + | Accepting_request of Point.t * Id_point.t * Peer_id.t + (** We accepted a connection after authentifying the remote peer. *) + | Rejecting_request of Point.t * Id_point.t * Peer_id.t + (** We rejected a connection after authentifying the remote peer. *) + | Request_rejected of Point.t * (Id_point.t * Peer_id.t) option + (** The remote peer rejected our connection. *) + + | Connection_established of Id_point.t * Peer_id.t + (** We succesfully established a authentified connection. *) + + | Swap_request_received of { source : Peer_id.t } + (** A swap request has been received. *) + | Swap_ack_received of { source : Peer_id.t } + (** A swap ack has been received *) + | Swap_request_sent of { source : Peer_id.t } + (** A swap request has been sent *) + | Swap_ack_sent of { source : Peer_id.t } + (** A swap ack has been sent *) + | Swap_request_ignored of { source : Peer_id.t } + (** A swap request has been ignored *) + | Swap_success of { source : Peer_id.t } + (** A swap operation has succeeded *) + | Swap_failure of { source : Peer_id.t } + (** A swap operation has failed *) + + | Disconnection of Peer_id.t + (** We decided to close the connection. *) + | External_disconnection of Peer_id.t + (** The connection was closed for external reason. *) + + val encoding : t Data_encoding.t + +end + +val watch: ('msg, 'meta) pool -> Log_event.t Lwt_stream.t * Watcher.stopper +(** [watch pool] is a [stream, close] a [stream] of events and a + [close] function for this stream. *) + (**/**) module Message : sig @@ -366,6 +396,8 @@ module Message : sig type 'msg t = | Bootstrap | Advertise of Point.t list + | Swap_request of Point.t * Peer_id.t + | Swap_ack of Point.t * Peer_id.t | Message of 'msg | Disconnect diff --git a/src/node/net/p2p_connection_pool_types.mli b/src/node/net/p2p_connection_pool_types.mli index 64bf0990a..4740678d5 100644 --- a/src/node/net/p2p_connection_pool_types.mli +++ b/src/node/net/p2p_connection_pool_types.mli @@ -133,6 +133,7 @@ module Point_info : sig } val encoding : t Data_encoding.t + end val fold_events : diff --git a/src/node/net/p2p_maintenance.ml b/src/node/net/p2p_maintenance.ml index 05b10fc81..aa20f9976 100644 --- a/src/node/net/p2p_maintenance.ml +++ b/src/node/net/p2p_maintenance.ml @@ -29,7 +29,7 @@ type 'meta t = { disco: P2p_discovery.t option ; just_maintained: unit Lwt_condition.t ; please_maintain: unit Lwt_condition.t ; - mutable worker : unit Lwt.t ; + mutable maintain_worker : unit Lwt.t ; } (** Select [expected] points amongst the disconnected known points. @@ -37,6 +37,7 @@ type 'meta t = { failed after [start_time]. It first selects points with the oldest last tentative. *) let connectable st start_time expected = + let Pool pool = st.pool in let now = Time.now () in let module Bounded_point_info = Utils.Bounded(struct @@ -49,9 +50,7 @@ let connectable st start_time expected = | Some t1, Some t2 -> Time.compare t2 t1 end) in let acc = Bounded_point_info.create expected in - let Pool pool = st.pool in - P2p_connection_pool.Points.fold_known - pool ~init:() + P2p_connection_pool.Points.fold_known pool ~init:() ~f:begin fun point pi () -> match Point_info.State.get pi with | Disconnected -> begin @@ -125,7 +124,7 @@ and too_few_connections st n_connected = P2p_connection_pool.broadcast_bootstrap_msg pool ; Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt.pick [ - P2p_connection_pool.PoolEvent.wait_new_peer pool ; + P2p_connection_pool.Pool_event.wait_new_peer pool ; Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? or wait for the existence of a non grey-listed peer ?? *) @@ -139,7 +138,7 @@ and too_many_connections st n_connected = (* too many connections, start the russian roulette *) let to_kill = n_connected - st.bounds.max_target in lwt_debug "Too many connections, will kill %d" to_kill >>= fun () -> - snd @@ P2p_connection_pool.fold_connections pool + snd @@ P2p_connection_pool.Connection.fold pool ~init:(to_kill, Lwt.return_unit) ~f:(fun _ conn (i, t) -> if i = 0 then (0, t) @@ -148,36 +147,46 @@ and too_many_connections st n_connected = maintain st let rec worker_loop st = + let Pool pool = st.pool in begin - let Pool pool = st.pool in Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt.pick [ Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_condition.wait st.please_maintain ; (* when asked *) - P2p_connection_pool.PoolEvent.wait_too_few_connections pool ; (* limits *) - P2p_connection_pool.PoolEvent.wait_too_many_connections pool + P2p_connection_pool.Pool_event.wait_too_few_connections pool ; (* limits *) + P2p_connection_pool.Pool_event.wait_too_many_connections pool ] >>= fun () -> return () end >>=? fun () -> - maintain st + let n_connected = P2p_connection_pool.active_connections pool in + if n_connected < st.bounds.min_threshold + || st.bounds.max_threshold < n_connected then + maintain st + else begin + P2p_connection_pool.send_swap_request pool ; + return () + end end >>= function | Ok () -> worker_loop st | Error [Lwt_utils.Canceled] -> Lwt.return_unit | Error _ -> Lwt.return_unit -let run ?(connection_timeout = 5.) bounds pool disco = +let run ~connection_timeout bounds pool disco = let canceler = Canceler.create () in let st = { - canceler ; connection_timeout ; - bounds ; pool = Pool pool ; disco ; + canceler ; + connection_timeout ; + bounds ; + pool = Pool pool ; + disco ; just_maintained = Lwt_condition.create () ; please_maintain = Lwt_condition.create () ; - worker = Lwt.return_unit ; + maintain_worker = Lwt.return_unit ; } in - st.worker <- + st.maintain_worker <- Lwt_utils.worker "maintenance" (fun () -> worker_loop st) - (fun () -> Canceler.cancel canceler); + (fun () -> Canceler.cancel canceler) ; st let maintain { just_maintained ; please_maintain } = @@ -185,8 +194,12 @@ let maintain { just_maintained ; please_maintain } = Lwt_condition.broadcast please_maintain () ; wait -let shutdown { canceler ; worker ; just_maintained } = +let shutdown { + canceler ; + maintain_worker ; + just_maintained } = Canceler.cancel canceler >>= fun () -> - worker >>= fun () -> + maintain_worker >>= fun () -> Lwt_condition.broadcast just_maintained () ; Lwt.return_unit + diff --git a/src/node/net/p2p_maintenance.mli b/src/node/net/p2p_maintenance.mli index 1398d2527..950edc9b1 100644 --- a/src/node/net/p2p_maintenance.mli +++ b/src/node/net/p2p_maintenance.mli @@ -34,7 +34,7 @@ type 'meta t (** Type of a maintenance worker. *) val run: - ?connection_timeout:float -> + connection_timeout:float -> bounds -> ('msg, 'meta) P2p_connection_pool.t -> P2p_discovery.t option -> diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index 5b0d71156..ea163fb24 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -70,7 +70,7 @@ let init_p2p net_params = match net_params with | None -> lwt_log_notice "P2P layer is disabled" >>= fun () -> - Lwt.return P2p.faked_network + Lwt.return (P2p.faked_network Distributed_db_metadata.cfg) | Some (config, limits) -> lwt_log_notice "bootstraping network..." >>= fun () -> P2p.create diff --git a/test/test_p2p_connection_pool.ml b/test/test_p2p_connection_pool.ml index 175fdcd6e..01f9bd2f0 100644 --- a/test/test_p2p_connection_pool.ml +++ b/test/test_p2p_connection_pool.ml @@ -40,7 +40,7 @@ let rec connect ~timeout pool point = lwt_log_info "Connect to %a" Point.pp point >>= fun () -> P2p_connection_pool.connect pool point ~timeout >>= function | Error [P2p_connection_pool.Connected] -> begin - match P2p_connection_pool.Points.find_connection pool point with + match P2p_connection_pool.Connection.find_by_point pool point with | Some conn -> return conn | None -> failwith "Woops..." end @@ -148,6 +148,7 @@ let make_net points repeat n = known_points_history_size = 100 ; max_known_points = None ; max_known_peer_ids = None ; + swap_linger = 0. ; } in Process.detach ~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id)