diff --git a/src/lib_base/p2p_connection.ml b/src/lib_base/p2p_connection.ml index 47488bfec..89b66e03c 100644 --- a/src/lib_base/p2p_connection.ml +++ b/src/lib_base/p2p_connection.ml @@ -7,10 +7,6 @@ (* *) (**************************************************************************) -type peer_id = Crypto_box.Public_key_hash.t -let peer_id_encoding = Crypto_box.Public_key_hash.encoding -let peer_id_pp = Crypto_box.Public_key_hash.pp - module Id = struct (* A net point (address x port). *) @@ -58,10 +54,10 @@ module Table = Hashtbl.Make (Id) module Info = struct type t = { - incoming : bool; - peer_id : peer_id; - id_point : Id.t; - remote_socket_port : P2p_addr.port; + incoming : bool ; + peer_id : P2p_peer_id.t ; + id_point : Id.t ; + remote_socket_port : P2p_addr.port ; versions : P2p_version.t list ; } @@ -74,7 +70,7 @@ module Info = struct { incoming ; peer_id ; id_point ; remote_socket_port ; versions }) (obj5 (req "incoming" bool) - (req "peer_id" peer_id_encoding) + (req "peer_id" P2p_peer_id.encoding) (req "id_point" Id.encoding) (req "remote_socket_port" uint16) (req "versions" (list P2p_version.encoding))) @@ -88,7 +84,7 @@ module Info = struct | Some port -> remote_addr, port in Format.fprintf ppf "%s %a %a (%a)" (if incoming then "↘" else "↗") - peer_id_pp peer_id + P2p_peer_id.pp peer_id P2p_point.Id.pp point P2p_version.pp version @@ -104,7 +100,7 @@ module Pool_event = struct | Too_many_connections | New_point of P2p_point.Id.t - | New_peer of peer_id + | New_peer of P2p_peer_id.t | Gc_points | Gc_peer_ids @@ -112,21 +108,21 @@ module Pool_event = struct | Incoming_connection of P2p_point.Id.t | Outgoing_connection of P2p_point.Id.t | Authentication_failed of P2p_point.Id.t - | Accepting_request of P2p_point.Id.t * Id.t * peer_id - | Rejecting_request of P2p_point.Id.t * Id.t * peer_id - | Request_rejected of P2p_point.Id.t * (Id.t * peer_id) option - | Connection_established of Id.t * peer_id + | Accepting_request of P2p_point.Id.t * Id.t * P2p_peer_id.t + | Rejecting_request of P2p_point.Id.t * Id.t * P2p_peer_id.t + | Request_rejected of P2p_point.Id.t * (Id.t * P2p_peer_id.t) option + | Connection_established of Id.t * P2p_peer_id.t - | Swap_request_received of { source : peer_id } - | Swap_ack_received of { source : peer_id } - | Swap_request_sent of { source : peer_id } - | Swap_ack_sent of { source : peer_id } - | Swap_request_ignored of { source : peer_id } - | Swap_success of { source : peer_id } - | Swap_failure of { source : peer_id } + | Swap_request_received of { source : P2p_peer_id.t } + | Swap_ack_received of { source : P2p_peer_id.t } + | Swap_request_sent of { source : P2p_peer_id.t } + | Swap_ack_sent of { source : P2p_peer_id.t } + | Swap_request_ignored of { source : P2p_peer_id.t } + | Swap_success of { source : P2p_peer_id.t } + | Swap_failure of { source : P2p_peer_id.t } - | Disconnection of peer_id - | External_disconnection of peer_id + | Disconnection of P2p_peer_id.t + | External_disconnection of P2p_peer_id.t let encoding = let open Data_encoding in @@ -146,7 +142,7 @@ module Pool_event = struct (function New_point p -> Some p | _ -> None) (fun p -> New_point p) ; case (Tag 3) (branch_encoding "new_peer" - (obj1 (req "peer_id" peer_id_encoding))) + (obj1 (req "peer_id" P2p_peer_id.encoding))) (function New_peer p -> Some p | _ -> None) (fun p -> New_peer p) ; case (Tag 4) (branch_encoding "incoming_connection" @@ -165,7 +161,7 @@ module Pool_event = struct (obj3 (req "point" P2p_point.Id.encoding) (req "id_point" Id.encoding) - (req "peer_id" peer_id_encoding))) + (req "peer_id" P2p_peer_id.encoding))) (function Accepting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None) (fun (p, id_p, g) -> Accepting_request (p, id_p, g)) ; @@ -173,7 +169,7 @@ module Pool_event = struct (obj3 (req "point" P2p_point.Id.encoding) (req "id_point" Id.encoding) - (req "peer_id" peer_id_encoding))) + (req "peer_id" P2p_peer_id.encoding))) (function Rejecting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None) (fun (p, id_p, g) -> Rejecting_request (p, id_p, g)) ; @@ -181,22 +177,22 @@ module Pool_event = struct (obj2 (req "point" P2p_point.Id.encoding) (opt "identity" - (tup2 Id.encoding peer_id_encoding)))) + (tup2 Id.encoding P2p_peer_id.encoding)))) (function Request_rejected (p, id) -> Some (p, id) | _ -> None) (fun (p, id) -> Request_rejected (p, id)) ; case (Tag 10) (branch_encoding "connection_established" (obj2 (req "id_point" Id.encoding) - (req "peer_id" peer_id_encoding))) + (req "peer_id" P2p_peer_id.encoding))) (function Connection_established (id_p, g) -> Some (id_p, g) | _ -> None) (fun (id_p, g) -> Connection_established (id_p, g)) ; case (Tag 11) (branch_encoding "disconnection" - (obj1 (req "peer_id" peer_id_encoding))) + (obj1 (req "peer_id" P2p_peer_id.encoding))) (function Disconnection g -> Some g | _ -> None) (fun g -> Disconnection g) ; case (Tag 12) (branch_encoding "external_disconnection" - (obj1 (req "peer_id" peer_id_encoding))) + (obj1 (req "peer_id" P2p_peer_id.encoding))) (function External_disconnection g -> Some g | _ -> None) (fun g -> External_disconnection g) ; case (Tag 13) (branch_encoding "gc_points" empty) @@ -206,43 +202,43 @@ module Pool_event = struct (function Gc_peer_ids -> Some () | _ -> None) (fun () -> Gc_peer_ids) ; case (Tag 15) (branch_encoding "swap_request_received" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_request_received { source } -> Some source | _ -> None) (fun source -> Swap_request_received { source }) ; case (Tag 16) (branch_encoding "swap_ack_received" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_ack_received { source } -> Some source | _ -> None) (fun source -> Swap_ack_received { source }) ; case (Tag 17) (branch_encoding "swap_request_sent" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_request_sent { source } -> Some source | _ -> None) (fun source -> Swap_request_sent { source }) ; case (Tag 18) (branch_encoding "swap_ack_sent" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_ack_sent { source } -> Some source | _ -> None) (fun source -> Swap_ack_sent { source }) ; case (Tag 19) (branch_encoding "swap_request_ignored" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_request_ignored { source } -> Some source | _ -> None) (fun source -> Swap_request_ignored { source }) ; case (Tag 20) (branch_encoding "swap_success" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_success { source } -> Some source | _ -> None) (fun source -> Swap_success { source }) ; case (Tag 21) (branch_encoding "swap_failure" - (obj1 (req "source" peer_id_encoding))) + (obj1 (req "source" P2p_peer_id.encoding))) (function | Swap_failure { source } -> Some source | _ -> None) diff --git a/src/lib_base/p2p_connection.mli b/src/lib_base/p2p_connection.mli index 207add08b..1e7f84115 100644 --- a/src/lib_base/p2p_connection.mli +++ b/src/lib_base/p2p_connection.mli @@ -7,9 +7,6 @@ (* *) (**************************************************************************) -type peer_id = Crypto_box.Public_key_hash.t -(* = P2p_peer.Id.t, but we should break cycles *) - module Id : sig type t = P2p_addr.t * P2p_addr.port option @@ -36,7 +33,7 @@ module Info : sig type t = { incoming : bool; - peer_id : peer_id; + peer_id : P2p_peer_id.t; id_point : Id.t; remote_socket_port : P2p_addr.port; versions : P2p_version.t list ; @@ -55,7 +52,7 @@ module Pool_event : sig | Too_many_connections | New_point of P2p_point.Id.t - | New_peer of peer_id + | New_peer of P2p_peer_id.t | Gc_points (** Garbage collection of known point table has been triggered. *) @@ -72,34 +69,34 @@ module Pool_event : sig | Authentication_failed of P2p_point.Id.t (** Remote point failed authentication *) - | Accepting_request of P2p_point.Id.t * Id.t * peer_id + | Accepting_request of P2p_point.Id.t * Id.t * P2p_peer_id.t (** We accepted a connection after authentifying the remote peer. *) - | Rejecting_request of P2p_point.Id.t * Id.t * peer_id + | Rejecting_request of P2p_point.Id.t * Id.t * P2p_peer_id.t (** We rejected a connection after authentifying the remote peer. *) - | Request_rejected of P2p_point.Id.t * (Id.t * peer_id) option + | Request_rejected of P2p_point.Id.t * (Id.t * P2p_peer_id.t) option (** The remote peer rejected our connection. *) - | Connection_established of Id.t * peer_id + | Connection_established of Id.t * P2p_peer_id.t (** We succesfully established a authentified connection. *) - | Swap_request_received of { source : peer_id } + | Swap_request_received of { source : P2p_peer_id.t } (** A swap request has been received. *) - | Swap_ack_received of { source : peer_id } + | Swap_ack_received of { source : P2p_peer_id.t } (** A swap ack has been received *) - | Swap_request_sent of { source : peer_id } + | Swap_request_sent of { source : P2p_peer_id.t } (** A swap request has been sent *) - | Swap_ack_sent of { source : peer_id } + | Swap_ack_sent of { source : P2p_peer_id.t } (** A swap ack has been sent *) - | Swap_request_ignored of { source : peer_id } + | Swap_request_ignored of { source : P2p_peer_id.t } (** A swap request has been ignored *) - | Swap_success of { source : peer_id } + | Swap_success of { source : P2p_peer_id.t } (** A swap operation has succeeded *) - | Swap_failure of { source : peer_id } + | Swap_failure of { source : P2p_peer_id.t } (** A swap operation has failed *) - | Disconnection of peer_id + | Disconnection of P2p_peer_id.t (** We decided to close the connection. *) - | External_disconnection of peer_id + | External_disconnection of P2p_peer_id.t (** The connection was closed for external reason. *) val encoding : t Data_encoding.t diff --git a/src/lib_base/p2p_peer.ml b/src/lib_base/p2p_peer.ml index a2e773320..230b22f19 100644 --- a/src/lib_base/p2p_peer.ml +++ b/src/lib_base/p2p_peer.ml @@ -7,9 +7,7 @@ (* *) (**************************************************************************) -open Error_monad - -module Id = Tezos_crypto.Crypto_box.Public_key_hash +module Id = P2p_peer_id module Table = Id.Table module Map = Id.Map @@ -90,7 +88,7 @@ module Info = struct end -module Event = struct +module Pool_event = struct type kind = | Accepting_request @@ -130,210 +128,3 @@ module Event = struct (opt "port" int16)) end - -module Pool_info = struct - - type 'data state = - | Accepted of { current_point: P2p_connection.Id.t ; - cancel: Lwt_canceler.t } - | Running of { data: 'data ; - current_point: P2p_connection.Id.t } - | Disconnected - - type ('conn, 'meta) t = { - peer_id : Id.t ; - created : Time.t ; - mutable state : 'conn state ; - mutable metadata : 'meta ; - mutable trusted : bool ; - mutable last_failed_connection : (P2p_connection.Id.t * Time.t) option ; - mutable last_rejected_connection : (P2p_connection.Id.t * Time.t) option ; - mutable last_established_connection : (P2p_connection.Id.t * Time.t) option ; - mutable last_disconnection : (P2p_connection.Id.t * Time.t) option ; - events : Event.t Ring.t ; - watchers : Event.t Lwt_watcher.input ; - } - type ('conn, 'meta) peer_info = ('conn, 'meta) t - - let compare gi1 gi2 = Id.compare gi1.peer_id gi2.peer_id - - let log_size = 100 - - let create ?(created = Time.now ()) ?(trusted = false) ~metadata peer_id = - { peer_id ; - created ; - state = Disconnected ; - metadata ; - trusted ; - last_failed_connection = None ; - last_rejected_connection = None ; - last_established_connection = None ; - last_disconnection = None ; - events = Ring.create log_size ; - watchers = Lwt_watcher.create_input () ; - } - - let encoding metadata_encoding = - let open Data_encoding in - conv - (fun { peer_id ; trusted ; metadata ; events ; created ; - last_failed_connection ; last_rejected_connection ; - last_established_connection ; last_disconnection ; _ } -> - (peer_id, created, trusted, metadata, Ring.elements events, - last_failed_connection, last_rejected_connection, - last_established_connection, last_disconnection)) - (fun (peer_id, created, trusted, metadata, event_list, - last_failed_connection, last_rejected_connection, - last_established_connection, last_disconnection) -> - let info = create ~trusted ~metadata peer_id in - let events = Ring.create log_size in - Ring.add_list info.events event_list ; - { state = Disconnected ; - trusted ; peer_id ; metadata ; created ; - last_failed_connection ; - last_rejected_connection ; - last_established_connection ; - last_disconnection ; - events ; - watchers = Lwt_watcher.create_input () ; - }) - (obj9 - (req "peer_id" Id.encoding) - (req "created" Time.encoding) - (dft "trusted" bool false) - (req "metadata" metadata_encoding) - (dft "events" (list Event.encoding) []) - (opt "last_failed_connection" - (tup2 P2p_connection.Id.encoding Time.encoding)) - (opt "last_rejected_connection" - (tup2 P2p_connection.Id.encoding Time.encoding)) - (opt "last_established_connection" - (tup2 P2p_connection.Id.encoding Time.encoding)) - (opt "last_disconnection" - (tup2 P2p_connection.Id.encoding Time.encoding))) - - let peer_id { peer_id ; _ } = peer_id - let created { created ; _ } = created - let metadata { metadata ; _ } = metadata - let set_metadata gi metadata = gi.metadata <- metadata - let trusted { trusted ; _ } = trusted - let set_trusted gi = gi.trusted <- true - let unset_trusted gi = gi.trusted <- false - let last_established_connection s = s.last_established_connection - let last_disconnection s = s.last_disconnection - let last_failed_connection s = s.last_failed_connection - let last_rejected_connection s = s.last_rejected_connection - - let last_seen s = - Time.recent - s.last_established_connection - (Time.recent s.last_rejected_connection s.last_disconnection) - let last_miss s = - Time.recent - s.last_failed_connection - (Time.recent s.last_rejected_connection s.last_disconnection) - - let log { events ; watchers ; _ } ?(timestamp = Time.now ()) point kind = - let event = { Event.kind ; timestamp ; point } in - Ring.add events event ; - Lwt_watcher.notify watchers event - - let log_incoming_rejection ?timestamp peer_info point = - log peer_info ?timestamp point Rejecting_request - - module File = struct - - let load path metadata_encoding = - let enc = Data_encoding.list (encoding metadata_encoding) in - if path <> "/dev/null" && Sys.file_exists path then - Data_encoding_ezjsonm.read_file path >>=? fun json -> - return (Data_encoding.Json.destruct enc json) - else - return [] - - let save path metadata_encoding peers = - let open Data_encoding in - Data_encoding_ezjsonm.write_file path @@ - Json.construct (list (encoding metadata_encoding)) peers - - end - -end - -module Pool_event = struct - include Event - let watch { Pool_info.watchers ; _ } = Lwt_watcher.create_stream watchers - let fold { Pool_info.events ; _ } ~init ~f = Ring.fold events ~init ~f -end - -module Pool_state = struct - - type 'data t = 'data Pool_info.state = - | Accepted of { current_point: P2p_connection.Id.t ; - cancel: Lwt_canceler.t } - | Running of { data: 'data ; - current_point: P2p_connection.Id.t } - | Disconnected - type 'data state = 'data t - - let pp ppf = function - | Accepted { current_point ; _ } -> - Format.fprintf ppf "accepted %a" P2p_connection.Id.pp current_point - | Running { current_point ; _ } -> - Format.fprintf ppf "running %a" P2p_connection.Id.pp current_point - | Disconnected -> - Format.fprintf ppf "disconnected" - - let get { Pool_info.state ; _ } = state - - let is_disconnected { Pool_info.state ; _ } = - match state with - | Disconnected -> true - | Accepted _ | Running _ -> false - - let set_accepted - ?(timestamp = Time.now ()) - peer_info current_point cancel = - assert begin - match peer_info.Pool_info.state with - | Accepted _ | Running _ -> false - | Disconnected -> true - end ; - peer_info.state <- Accepted { current_point ; cancel } ; - Pool_info.log peer_info ~timestamp current_point Accepting_request - - let set_running - ?(timestamp = Time.now ()) - peer_info point data = - assert begin - match peer_info.Pool_info.state with - | Disconnected -> true (* request to unknown peer_id. *) - | Running _ -> false - | Accepted { current_point ; _ } -> - P2p_connection.Id.equal point current_point - end ; - peer_info.state <- Running { data ; current_point = point } ; - peer_info.last_established_connection <- Some (point, timestamp) ; - Pool_info.log peer_info ~timestamp point Connection_established - - let set_disconnected - ?(timestamp = Time.now ()) ?(requested = false) peer_info = - let current_point, (event : Event.kind) = - match peer_info.Pool_info.state with - | Accepted { current_point ; _ } -> - peer_info.last_rejected_connection <- - Some (current_point, timestamp) ; - current_point, Request_rejected - | Running { current_point ; _ } -> - peer_info.last_disconnection <- - Some (current_point, timestamp) ; - current_point, - if requested then Disconnection else External_disconnection - | Disconnected -> assert false - in - peer_info.state <- Disconnected ; - Pool_info.log peer_info ~timestamp current_point event - - - -end diff --git a/src/lib_base/p2p_peer.mli b/src/lib_base/p2p_peer.mli index 3c5a195ec..d6d86f05f 100644 --- a/src/lib_base/p2p_peer.mli +++ b/src/lib_base/p2p_peer.mli @@ -7,9 +7,7 @@ (* *) (**************************************************************************) -open Error_monad - -module Id = Tezos_crypto.Crypto_box.Public_key_hash +module Id = P2p_peer_id module Map = Id.Map module Set = Id.Set @@ -47,110 +45,6 @@ module Info : sig end -(** P2p_peer.Id info: current and historical information about a peer_id *) - -module Pool_info : sig - - type ('conn, 'meta) t - type ('conn, 'meta) peer_info = ('conn, 'meta) t - - val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int - - val create : - ?created:Time.t -> - ?trusted:bool -> - metadata:'meta -> - Id.t -> ('conn, 'meta) peer_info - (** [create ~trusted ~meta peer_id] is a freshly minted peer_id info for - [peer_id]. *) - - val peer_id : ('conn, 'meta) peer_info -> Id.t - - val created : ('conn, 'meta) peer_info -> Time.t - val metadata : ('conn, 'meta) peer_info -> 'meta - val set_metadata : ('conn, 'meta) peer_info -> 'meta -> unit - - val trusted : ('conn, 'meta) peer_info -> bool - val set_trusted : ('conn, 'meta) peer_info -> unit - val unset_trusted : ('conn, 'meta) peer_info -> unit - - val last_failed_connection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option - val last_rejected_connection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option - val last_established_connection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option - val last_disconnection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option - - val last_seen : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option - (** [last_seen gi] is the most recent of: - - * last established connection - * last rejected connection - * last disconnection - *) - - val last_miss : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option - (** [last_miss gi] is the most recent of: - - * last failed connection - * last rejected connection - * last disconnection - *) - - val log_incoming_rejection : - ?timestamp:Time.t -> - ('conn, 'meta) peer_info -> P2p_connection.Id.t -> unit - - module File : sig - val load : - string -> 'meta Data_encoding.t -> - ('conn, 'meta) peer_info list tzresult Lwt.t - val save : - string -> 'meta Data_encoding.t -> - ('conn, 'meta) peer_info list -> unit tzresult Lwt.t - end - -end - -module Pool_state : sig - - type 'conn t = - | Accepted of { current_point: P2p_connection.Id.t ; - cancel: Lwt_canceler.t } - (** We accepted a incoming connection, we greeted back and - we are waiting for an acknowledgement. *) - | Running of { data: 'conn ; - current_point: P2p_connection.Id.t } - (** Successfully authentificated connection, normal business. *) - | Disconnected - (** No connection established currently. *) - type 'conn state = 'conn t - - val pp : Format.formatter -> 'conn t -> unit - - val get : ('conn, 'meta) Pool_info.t -> 'conn state - - val is_disconnected : ('conn, 'meta) Pool_info.t -> bool - - val set_accepted : - ?timestamp:Time.t -> - ('conn, 'meta) Pool_info.t -> P2p_connection.Id.t -> Lwt_canceler.t -> unit - - val set_running : - ?timestamp:Time.t -> - ('conn, 'meta) Pool_info.t -> P2p_connection.Id.t -> 'conn -> unit - - val set_disconnected : - ?timestamp:Time.t -> - ?requested:bool -> - ('conn, 'meta) Pool_info.t -> unit - -end - module Pool_event : sig type kind = @@ -175,10 +69,4 @@ module Pool_event : sig val encoding : t Data_encoding.t - val fold : - ('conn, 'meta) Pool_info.t -> init:'a -> f:('a -> t -> 'a) -> 'a - - val watch : - ('conn, 'meta) Pool_info.t -> t Lwt_stream.t * Lwt_watcher.stopper - end diff --git a/src/lib_base/p2p_connection_id.ml b/src/lib_base/p2p_peer_id.ml similarity index 94% rename from src/lib_base/p2p_connection_id.ml rename to src/lib_base/p2p_peer_id.ml index 401a282ee..7dfb5e1df 100644 --- a/src/lib_base/p2p_connection_id.ml +++ b/src/lib_base/p2p_peer_id.ml @@ -7,3 +7,5 @@ (* *) (**************************************************************************) +include Crypto_box.Public_key_hash + diff --git a/src/lib_base/p2p_connection_id.mli b/src/lib_base/p2p_peer_id.mli similarity index 88% rename from src/lib_base/p2p_connection_id.mli rename to src/lib_base/p2p_peer_id.mli index d1866cd2a..6465f0d4e 100644 --- a/src/lib_base/p2p_connection_id.mli +++ b/src/lib_base/p2p_peer_id.mli @@ -7,5 +7,4 @@ (* *) (**************************************************************************) -(** P2p_point representing a reachable socket address *) - +include Tezos_crypto.S.INTERNAL_HASH with type t = Crypto_box.Public_key_hash.t diff --git a/src/lib_base/p2p_point.ml b/src/lib_base/p2p_point.ml index 356615027..db7a6c843 100644 --- a/src/lib_base/p2p_point.ml +++ b/src/lib_base/p2p_point.ml @@ -7,11 +7,6 @@ (* *) (**************************************************************************) -type peer_id = Crypto_box.Public_key_hash.t -let peer_id_encoding = Crypto_box.Public_key_hash.encoding -let peer_id_pp = Crypto_box.Public_key_hash.pp -let peer_id_equal = Crypto_box.Public_key_hash.equal - module Id = struct (* A net point (address x port). *) @@ -108,11 +103,11 @@ module State = struct type t = | Requested - | Accepted of peer_id - | Running of peer_id + | Accepted of P2p_peer_id.t + | Running of P2p_peer_id.t | Disconnected - let of_peer_id = function + let of_p2p_peer_id = function | Requested -> None | Accepted pi -> Some pi | Running pi -> Some pi @@ -143,13 +138,13 @@ module State = struct (function Requested -> Some () | _ -> None) (fun () -> Requested) ; case (Tag 1) (branch_encoding "accepted" - (obj1 (req "peer_id" peer_id_encoding))) - (function Accepted peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Accepted peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function Accepted p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Accepted p2p_peer_id) ; case (Tag 2) (branch_encoding "running" - (obj1 (req "peer_id" peer_id_encoding))) - (function Running peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Running peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function Running p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Running p2p_peer_id) ; case (Tag 3) (branch_encoding "disconnected" empty) (function Disconnected -> Some () | _ -> None) (fun () -> Disconnected) ; @@ -164,10 +159,10 @@ module Info = struct greylisted_until : Time.t ; state : State.t ; last_failed_connection : Time.t option ; - last_rejected_connection : (peer_id * Time.t) option ; - last_established_connection : (peer_id * Time.t) option ; - last_disconnection : (peer_id * Time.t) option ; - last_seen : (peer_id * Time.t) option ; + last_rejected_connection : (P2p_peer_id.t * Time.t) option ; + last_established_connection : (P2p_peer_id.t * Time.t) option ; + last_disconnection : (P2p_peer_id.t * Time.t) option ; + last_seen : (P2p_peer_id.t * Time.t) option ; last_miss : Time.t option ; } @@ -178,16 +173,16 @@ module Info = struct last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; last_seen ; last_miss } -> - let peer_id = State.of_peer_id state in - (trusted, greylisted_until, state, peer_id, + let p2p_peer_id = State.of_p2p_peer_id state in + (trusted, greylisted_until, state, p2p_peer_id, last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection, last_seen, last_miss)) - (fun (trusted, greylisted_until, state, peer_id, + (fun (trusted, greylisted_until, state, p2p_peer_id, last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection, last_seen, last_miss) -> - let state = State.of_peerid_state state peer_id in + let state = State.of_peerid_state state p2p_peer_id in { trusted ; greylisted_until ; state ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; @@ -196,26 +191,26 @@ module Info = struct (req "trusted" bool) (dft "greylisted_until" Time.encoding Time.epoch) (req "state" State.encoding) - (opt "peer_id" peer_id_encoding) + (opt "p2p_peer_id" P2p_peer_id.encoding) (opt "last_failed_connection" Time.encoding) - (opt "last_rejected_connection" (tup2 peer_id_encoding Time.encoding)) - (opt "last_established_connection" (tup2 peer_id_encoding Time.encoding)) - (opt "last_disconnection" (tup2 peer_id_encoding Time.encoding)) - (opt "last_seen" (tup2 peer_id_encoding Time.encoding)) + (opt "last_rejected_connection" (tup2 P2p_peer_id.encoding Time.encoding)) + (opt "last_established_connection" (tup2 P2p_peer_id.encoding Time.encoding)) + (opt "last_disconnection" (tup2 P2p_peer_id.encoding Time.encoding)) + (opt "last_seen" (tup2 P2p_peer_id.encoding Time.encoding)) (opt "last_miss" Time.encoding)) end -module Event = struct +module Pool_event = struct type kind = | Outgoing_request - | Accepting_request of peer_id - | Rejecting_request of peer_id - | Request_rejected of peer_id option - | Connection_established of peer_id - | Disconnection of peer_id - | External_disconnection of peer_id + | Accepting_request of P2p_peer_id.t + | Rejecting_request of P2p_peer_id.t + | Request_rejected of P2p_peer_id.t option + | Connection_established of P2p_peer_id.t + | Disconnection of P2p_peer_id.t + | External_disconnection of P2p_peer_id.t let kind_encoding = let open Data_encoding in @@ -228,29 +223,29 @@ module Event = struct (function Outgoing_request -> Some () | _ -> None) (fun () -> Outgoing_request) ; case (Tag 1) (branch_encoding "accepting_request" - (obj1 (req "peer_id" peer_id_encoding))) - (function Accepting_request peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Accepting_request peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function Accepting_request p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Accepting_request p2p_peer_id) ; case (Tag 2) (branch_encoding "rejecting_request" - (obj1 (req "peer_id" peer_id_encoding))) - (function Rejecting_request peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Rejecting_request peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function Rejecting_request p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Rejecting_request p2p_peer_id) ; case (Tag 3) (branch_encoding "request_rejected" - (obj1 (opt "peer_id" peer_id_encoding))) - (function Request_rejected peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Request_rejected peer_id) ; + (obj1 (opt "p2p_peer_id" P2p_peer_id.encoding))) + (function Request_rejected p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Request_rejected p2p_peer_id) ; case (Tag 4) (branch_encoding "rejecting_request" - (obj1 (req "peer_id" peer_id_encoding))) - (function Connection_established peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Connection_established peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function Connection_established p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Connection_established p2p_peer_id) ; case (Tag 5) (branch_encoding "rejecting_request" - (obj1 (req "peer_id" peer_id_encoding))) - (function Disconnection peer_id -> Some peer_id | _ -> None) - (fun peer_id -> Disconnection peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function Disconnection p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> Disconnection p2p_peer_id) ; case (Tag 6) (branch_encoding "rejecting_request" - (obj1 (req "peer_id" peer_id_encoding))) - (function External_disconnection peer_id -> Some peer_id | _ -> None) - (fun peer_id -> External_disconnection peer_id) ; + (obj1 (req "p2p_peer_id" P2p_peer_id.encoding))) + (function External_disconnection p2p_peer_id -> Some p2p_peer_id | _ -> None) + (fun p2p_peer_id -> External_disconnection p2p_peer_id) ; ] type t = { @@ -267,211 +262,3 @@ module Event = struct (req "kind" kind_encoding) (req "timestamp" Time.encoding)) end - -module Pool_info = struct - - type 'data state = - | Requested of { cancel: Lwt_canceler.t } - | Accepted of { current_peer_id: peer_id ; - cancel: Lwt_canceler.t } - | Running of { data: 'data ; - current_peer_id: peer_id } - | Disconnected - - type greylisting_config = { - factor: float ; - initial_delay: int ; - disconnection_delay: int ; - } - - type 'data t = { - point : Id.t ; - mutable trusted : bool ; - mutable state : 'data state ; - mutable last_failed_connection : Time.t option ; - mutable last_rejected_connection : (peer_id * Time.t) option ; - mutable last_established_connection : (peer_id * Time.t) option ; - mutable last_disconnection : (peer_id * Time.t) option ; - greylisting : greylisting_config ; - mutable greylisting_delay : float ; - mutable greylisting_end : Time.t ; - events : Event.t Ring.t ; - watchers : Event.t Lwt_watcher.input ; - } - type 'data point_info = 'data t - - let compare pi1 pi2 = Id.compare pi1.point pi2.point - - let log_size = 100 - - let default_greylisting_config = { - factor = 1.2 ; - initial_delay = 1 ; - disconnection_delay = 60 ; - } - - let create - ?(trusted = false) - ?(greylisting_config = default_greylisting_config) addr port = { - point = (addr, port) ; - trusted ; - state = Disconnected ; - last_failed_connection = None ; - last_rejected_connection = None ; - last_established_connection = None ; - last_disconnection = None ; - events = Ring.create log_size ; - greylisting = greylisting_config ; - greylisting_delay = 1. ; - greylisting_end = Time.epoch ; - watchers = Lwt_watcher.create_input () ; - } - - let point s = s.point - let trusted s = s.trusted - let set_trusted gi = gi.trusted <- true - let unset_trusted gi = gi.trusted <- false - let last_established_connection s = s.last_established_connection - let last_disconnection s = s.last_disconnection - let last_failed_connection s = s.last_failed_connection - let last_rejected_connection s = s.last_rejected_connection - let greylisted ?(now = Time.now ()) s = - Time.compare now s.greylisting_end <= 0 - let greylisted_until s = s.greylisting_end - - let last_seen s = - Time.recent s.last_rejected_connection - (Time.recent s.last_established_connection s.last_disconnection) - let last_miss s = - match - s.last_failed_connection, - (Option.map ~f:(fun (_, time) -> time) @@ - Time.recent s.last_rejected_connection s.last_disconnection) with - | (None, None) -> None - | (None, (Some _ as a)) - | (Some _ as a, None) -> a - | (Some t1 as a1 , (Some t2 as a2)) -> - if Time.compare t1 t2 < 0 then a2 else a1 - - let log { events ; watchers ; _ } ?(timestamp = Time.now ()) kind = - let event = { Event.kind ; timestamp } in - Ring.add events event ; - Lwt_watcher.notify watchers event - - let log_incoming_rejection ?timestamp point_info peer_id = - log point_info ?timestamp (Rejecting_request peer_id) - -end - -module Pool_event = struct - - include Event - - let fold { Pool_info.events ; _ } ~init ~f = Ring.fold events ~init ~f - - let watch { Pool_info.watchers ; _ } = Lwt_watcher.create_stream watchers - -end - -module Pool_state = struct - - type 'data t = 'data Pool_info.state = - | Requested of { cancel: Lwt_canceler.t } - | Accepted of { current_peer_id: peer_id ; - cancel: Lwt_canceler.t } - | Running of { data: 'data ; - current_peer_id: peer_id } - | Disconnected - type 'data state = 'data t - - let pp ppf = function - | Requested _ -> - Format.fprintf ppf "requested" - | Accepted { current_peer_id ; _ } -> - Format.fprintf ppf "accepted %a" peer_id_pp current_peer_id - | Running { current_peer_id ; _ } -> - Format.fprintf ppf "running %a" peer_id_pp current_peer_id - | Disconnected -> - Format.fprintf ppf "disconnected" - - let get { Pool_info.state ; _ } = state - - let is_disconnected { Pool_info.state ; _ } = - match state with - | Disconnected -> true - | Requested _ | Accepted _ | Running _ -> false - - let set_requested ?timestamp point_info cancel = - assert begin - match point_info.Pool_info.state with - | Requested _ -> true - | Accepted _ | Running _ -> false - | Disconnected -> true - end ; - point_info.state <- Requested { cancel } ; - Pool_info.log point_info ?timestamp Outgoing_request - - let set_accepted - ?(timestamp = Time.now ()) - point_info current_peer_id cancel = - (* log_notice "SET_ACCEPTED %a@." P2p_point.pp point_info.point ; *) - assert begin - match point_info.Pool_info.state with - | Accepted _ | Running _ -> false - | Requested _ | Disconnected -> true - end ; - point_info.state <- Accepted { current_peer_id ; cancel } ; - Pool_info.log point_info ~timestamp (Accepting_request current_peer_id) - - let set_running - ?(timestamp = Time.now ()) - point_info peer_id data = - assert begin - match point_info.Pool_info.state with - | Disconnected -> true (* request to unknown peer_id. *) - | Running _ -> false - | Accepted { current_peer_id ; _ } -> peer_id_equal peer_id current_peer_id - | Requested _ -> true - end ; - point_info.state <- Running { data ; current_peer_id = peer_id } ; - point_info.last_established_connection <- Some (peer_id, timestamp) ; - Pool_info.log point_info ~timestamp (Connection_established peer_id) - - let set_greylisted timestamp point_info = - point_info.Pool_info.greylisting_end <- - Time.add - timestamp - (Int64.of_float point_info.Pool_info.greylisting_delay) ; - point_info.greylisting_delay <- - point_info.greylisting_delay *. point_info.greylisting.factor - - let set_disconnected - ?(timestamp = Time.now ()) ?(requested = false) point_info = - let event : Event.kind = - match point_info.Pool_info.state with - | Requested _ -> - set_greylisted timestamp point_info ; - point_info.last_failed_connection <- Some timestamp ; - Request_rejected None - | Accepted { current_peer_id ; _ } -> - set_greylisted timestamp point_info ; - point_info.last_rejected_connection <- - Some (current_peer_id, timestamp) ; - Request_rejected (Some current_peer_id) - | Running { current_peer_id ; _ } -> - point_info.greylisting_delay <- - float_of_int point_info.greylisting.initial_delay ; - point_info.greylisting_end <- - Time.add timestamp - (Int64.of_int point_info.greylisting.disconnection_delay) ; - point_info.last_disconnection <- Some (current_peer_id, timestamp) ; - if requested - then Disconnection current_peer_id - else External_disconnection current_peer_id - | Disconnected -> - assert false - in - point_info.state <- Disconnected ; - Pool_info.log point_info ~timestamp event - -end diff --git a/src/lib_base/p2p_point.mli b/src/lib_base/p2p_point.mli index 90252f015..b806f532c 100644 --- a/src/lib_base/p2p_point.mli +++ b/src/lib_base/p2p_point.mli @@ -7,9 +7,6 @@ (* *) (**************************************************************************) -type peer_id = Crypto_box.Public_key_hash.t -(* = P2p_peer.Id.t, but we should break cycles *) - module Id : sig type t = P2p_addr.t * P2p_addr.port @@ -36,15 +33,15 @@ module State : sig type t = | Requested - | Accepted of peer_id - | Running of peer_id + | Accepted of P2p_peer_id.t + | Running of P2p_peer_id.t | Disconnected val pp_digram : Format.formatter -> t -> unit val encoding : t Data_encoding.t - val of_peer_id : t -> peer_id option - val of_peerid_state : t -> peer_id option -> t + val of_p2p_peer_id : t -> P2p_peer_id.t option + val of_peerid_state : t -> P2p_peer_id.t option -> t end @@ -55,10 +52,10 @@ module Info : sig greylisted_until : Time.t ; state : State.t ; last_failed_connection : Time.t option ; - last_rejected_connection : (peer_id * Time.t) option ; - last_established_connection : (peer_id * Time.t) option ; - last_disconnection : (peer_id * Time.t) option ; - last_seen : (peer_id * Time.t) option ; + last_rejected_connection : (P2p_peer_id.t * Time.t) option ; + last_established_connection : (P2p_peer_id.t * Time.t) option ; + last_disconnection : (P2p_peer_id.t * Time.t) option ; + last_seen : (P2p_peer_id.t * Time.t) option ; last_miss : Time.t option ; } @@ -66,127 +63,22 @@ module Info : sig end -module Pool_info : sig - - type 'conn t - type 'conn point_info = 'conn t - (** Type of info associated to a point. *) - - val compare : 'conn point_info -> 'conn point_info -> int - - type greylisting_config = { - factor: float ; - initial_delay: int ; - disconnection_delay: int ; - } - - val create : - ?trusted:bool -> - ?greylisting_config:greylisting_config -> - P2p_addr.t -> P2p_addr.port -> 'conn point_info - (** [create ~trusted addr port] is a freshly minted point_info. If - [trusted] is true, this point is considered trusted and will - be treated as such. *) - - val trusted : 'conn point_info -> bool - (** [trusted pi] is [true] iff [pi] has is trusted, - i.e. "whitelisted". *) - - val set_trusted : 'conn point_info -> unit - val unset_trusted : 'conn point_info -> unit - - val last_failed_connection : - 'conn point_info -> Time.t option - val last_rejected_connection : - 'conn point_info -> (peer_id * Time.t) option - val last_established_connection : - 'conn point_info -> (peer_id * Time.t) option - val last_disconnection : - 'conn point_info -> (peer_id * Time.t) option - - val last_seen : - 'conn point_info -> (peer_id * Time.t) option - (** [last_seen pi] is the most recent of: - - * last established connection - * last rejected connection - * last disconnection - *) - - val last_miss : - 'conn point_info -> Time.t option - (** [last_miss pi] is the most recent of: - - * last failed connection - * last rejected connection - * last disconnection - *) - - val greylisted : - ?now:Time.t -> 'conn point_info -> bool - - val greylisted_until : 'conn point_info -> Time.t - - val point : 'conn point_info -> Id.t - - val log_incoming_rejection : - ?timestamp:Time.t -> 'conn point_info -> peer_id -> unit - -end - -module Pool_state : sig - - type 'conn t = - | Requested of { cancel: Lwt_canceler.t } - (** We initiated a connection. *) - | Accepted of { current_peer_id: peer_id ; - cancel: Lwt_canceler.t } - (** We accepted a incoming connection. *) - | Running of { data: 'conn ; - current_peer_id: peer_id } - (** Successfully authentificated connection, normal business. *) - | Disconnected - (** No connection established currently. *) - type 'conn state = 'conn t - - val pp : Format.formatter -> 'conn t -> unit - - val get : 'conn Pool_info.t -> 'conn state - - val is_disconnected : 'conn Pool_info.t -> bool - - val set_requested : - ?timestamp:Time.t -> - 'conn Pool_info.t -> Lwt_canceler.t -> unit - - val set_accepted : - ?timestamp:Time.t -> - 'conn Pool_info.t -> peer_id -> Lwt_canceler.t -> unit - - val set_running : - ?timestamp:Time.t -> 'conn Pool_info.t -> peer_id -> 'conn -> unit - - val set_disconnected : - ?timestamp:Time.t -> ?requested:bool -> 'conn Pool_info.t -> unit - -end - module Pool_event : sig type kind = | Outgoing_request (** We initiated a connection. *) - | Accepting_request of peer_id + | Accepting_request of P2p_peer_id.t (** We accepted a connection after authentifying the remote peer. *) - | Rejecting_request of peer_id + | Rejecting_request of P2p_peer_id.t (** We rejected a connection after authentifying the remote peer. *) - | Request_rejected of peer_id option + | Request_rejected of P2p_peer_id.t option (** The remote peer rejected our connection. *) - | Connection_established of peer_id + | Connection_established of P2p_peer_id.t (** We succesfully established a authentified connection. *) - | Disconnection of peer_id + | Disconnection of P2p_peer_id.t (** We decided to close the connection. *) - | External_disconnection of peer_id + | External_disconnection of P2p_peer_id.t (** The connection was closed for external reason. *) type t = { @@ -196,12 +88,6 @@ module Pool_event : sig val encoding : t Data_encoding.t - val fold : - 'conn Pool_info.t -> init:'a -> f:('a -> t -> 'a) -> 'a - - val watch : - 'conn Pool_info.t -> t Lwt_stream.t * Lwt_watcher.stopper - end diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 184292a91..b2de18f9a 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -516,12 +516,12 @@ module RPC = struct open P2p_point.State let info_of_point_info i = - let state = match P2p_point.Pool_state.get i with + let state = match P2p_point_state.get i with | Requested _ -> Requested | Accepted { current_peer_id ; _ } -> Accepted current_peer_id | Running { current_peer_id ; _ } -> Running current_peer_id | Disconnected -> Disconnected in - P2p_point.Pool_info.{ + P2p_point_state.Info.{ trusted = trusted i ; state ; greylisted_until = greylisted_until i ; @@ -550,7 +550,7 @@ module RPC = struct ~default:[] ~f:begin fun pi -> let evts = - P2p_point.Pool_event.fold + P2p_point_state.Info.fold pi ~init:[] ~f:(fun a e -> e :: a) in (if rev then List.rev_sub else List.sub) evts max end @@ -561,7 +561,7 @@ module RPC = struct | Some pool -> match P2p_pool.Points.info pool point with | None -> raise Not_found - | Some pi -> P2p_point.Pool_event.watch pi + | Some pi -> P2p_point_state.Info.watch pi let list ?(restrict=[]) net = match net.pool with @@ -585,18 +585,18 @@ module RPC = struct open P2p_peer.State let info_of_peer_info pool i = - let state, id_point = match P2p_peer.Pool_state.get i with + let state, id_point = match P2p_peer_state.get i with | Accepted { current_point } -> Accepted, Some current_point | Running { current_point } -> Running, Some current_point | Disconnected -> Disconnected, None in - let peer_id = P2p_peer.Pool_info.peer_id i in + let peer_id = P2p_peer_state.Info.peer_id i in let score = P2p_pool.Peers.get_score pool peer_id in let stat = match P2p_pool.Connection.find_by_peer_id pool peer_id with | None -> P2p_stat.empty | Some conn -> P2p_pool.Connection.stat conn - in P2p_peer.Pool_info.{ + in P2p_peer_state.Info.{ score ; trusted = trusted i ; state ; @@ -627,7 +627,7 @@ module RPC = struct (P2p_pool.Peers.info pool peer_id) ~default:[] ~f:begin fun gi -> - let evts = P2p_peer.Pool_event.fold gi + let evts = P2p_peer_state.Info.fold gi ~init:[] ~f:(fun a e -> e :: a) in (if rev then List.rev_sub else List.sub) evts max end @@ -638,7 +638,7 @@ module RPC = struct | Some pool -> match P2p_pool.Peers.info pool peer_id with | None -> raise Not_found - | Some gi -> P2p_peer.Pool_event.watch gi + | Some gi -> P2p_peer_state.Info.watch gi let list ?(restrict=[]) net = match net.pool with diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index ecb74b204..56d0f327f 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -49,11 +49,11 @@ let connectable st start_time expected = let acc = Bounded_point_info.create expected in P2p_pool.Points.fold_known pool ~init:() ~f:begin fun point pi () -> - match P2p_point.Pool_state.get pi with + match P2p_point_state.get pi with | Disconnected -> begin - match P2p_point.Pool_info.last_miss pi with + match P2p_point_state.Info.last_miss pi with | Some last when Time.(start_time < last) - || P2p_point.Pool_info.greylisted ~now pi -> () + || P2p_point_state.Info.greylisted ~now pi -> () | last -> Bounded_point_info.insert (last, point) acc end diff --git a/src/lib_p2p/p2p_peer_state.ml b/src/lib_p2p/p2p_peer_state.ml new file mode 100644 index 000000000..ae21f270c --- /dev/null +++ b/src/lib_p2p/p2p_peer_state.ml @@ -0,0 +1,201 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_peer + +type 'data t = + | Accepted of { current_point: P2p_connection.Id.t ; + cancel: Lwt_canceler.t } + | Running of { data: 'data ; + current_point: P2p_connection.Id.t } + | Disconnected +type 'data state = 'data t + +let pp ppf = function + | Accepted { current_point ; _ } -> + Format.fprintf ppf "accepted %a" P2p_connection.Id.pp current_point + | Running { current_point ; _ } -> + Format.fprintf ppf "running %a" P2p_connection.Id.pp current_point + | Disconnected -> + Format.fprintf ppf "disconnected" + +module Info = struct + + type ('conn, 'meta) t = { + peer_id : Id.t ; + created : Time.t ; + mutable state : 'conn state ; + mutable metadata : 'meta ; + mutable trusted : bool ; + mutable last_failed_connection : (P2p_connection.Id.t * Time.t) option ; + mutable last_rejected_connection : (P2p_connection.Id.t * Time.t) option ; + mutable last_established_connection : (P2p_connection.Id.t * Time.t) option ; + mutable last_disconnection : (P2p_connection.Id.t * Time.t) option ; + events : Pool_event.t Ring.t ; + watchers : Pool_event.t Lwt_watcher.input ; + } + type ('conn, 'meta) peer_info = ('conn, 'meta) t + + let compare gi1 gi2 = Id.compare gi1.peer_id gi2.peer_id + + let log_size = 100 + + let create ?(created = Time.now ()) ?(trusted = false) ~metadata peer_id = + { peer_id ; + created ; + state = Disconnected ; + metadata ; + trusted ; + last_failed_connection = None ; + last_rejected_connection = None ; + last_established_connection = None ; + last_disconnection = None ; + events = Ring.create log_size ; + watchers = Lwt_watcher.create_input () ; + } + + let encoding metadata_encoding = + let open Data_encoding in + conv + (fun { peer_id ; trusted ; metadata ; events ; created ; + last_failed_connection ; last_rejected_connection ; + last_established_connection ; last_disconnection ; _ } -> + (peer_id, created, trusted, metadata, Ring.elements events, + last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection)) + (fun (peer_id, created, trusted, metadata, event_list, + last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection) -> + let info = create ~trusted ~metadata peer_id in + let events = Ring.create log_size in + Ring.add_list info.events event_list ; + { state = Disconnected ; + trusted ; peer_id ; metadata ; created ; + last_failed_connection ; + last_rejected_connection ; + last_established_connection ; + last_disconnection ; + events ; + watchers = Lwt_watcher.create_input () ; + }) + (obj9 + (req "peer_id" Id.encoding) + (req "created" Time.encoding) + (dft "trusted" bool false) + (req "metadata" metadata_encoding) + (dft "events" (list Pool_event.encoding) []) + (opt "last_failed_connection" + (tup2 P2p_connection.Id.encoding Time.encoding)) + (opt "last_rejected_connection" + (tup2 P2p_connection.Id.encoding Time.encoding)) + (opt "last_established_connection" + (tup2 P2p_connection.Id.encoding Time.encoding)) + (opt "last_disconnection" + (tup2 P2p_connection.Id.encoding Time.encoding))) + + let peer_id { peer_id ; _ } = peer_id + let created { created ; _ } = created + let metadata { metadata ; _ } = metadata + let set_metadata gi metadata = gi.metadata <- metadata + let trusted { trusted ; _ } = trusted + let set_trusted gi = gi.trusted <- true + let unset_trusted gi = gi.trusted <- false + let last_established_connection s = s.last_established_connection + let last_disconnection s = s.last_disconnection + let last_failed_connection s = s.last_failed_connection + let last_rejected_connection s = s.last_rejected_connection + + let last_seen s = + Time.recent + s.last_established_connection + (Time.recent s.last_rejected_connection s.last_disconnection) + let last_miss s = + Time.recent + s.last_failed_connection + (Time.recent s.last_rejected_connection s.last_disconnection) + + let log { events ; watchers ; _ } ?(timestamp = Time.now ()) point kind = + let event = { Pool_event.kind ; timestamp ; point } in + Ring.add events event ; + Lwt_watcher.notify watchers event + + let log_incoming_rejection ?timestamp peer_info point = + log peer_info ?timestamp point Rejecting_request + + module File = struct + + let load path metadata_encoding = + let enc = Data_encoding.list (encoding metadata_encoding) in + if path <> "/dev/null" && Sys.file_exists path then + Data_encoding_ezjsonm.read_file path >>=? fun json -> + return (Data_encoding.Json.destruct enc json) + else + return [] + + let save path metadata_encoding peers = + let open Data_encoding in + Data_encoding_ezjsonm.write_file path @@ + Json.construct (list (encoding metadata_encoding)) peers + + end + + let watch { watchers ; _ } = Lwt_watcher.create_stream watchers + let fold { events ; _ } ~init ~f = Ring.fold events ~init ~f + +end + +let get { Info.state ; _ } = state + +let is_disconnected { Info.state ; _ } = + match state with + | Disconnected -> true + | Accepted _ | Running _ -> false + +let set_accepted + ?(timestamp = Time.now ()) + peer_info current_point cancel = + assert begin + match peer_info.Info.state with + | Accepted _ | Running _ -> false + | Disconnected -> true + end ; + peer_info.state <- Accepted { current_point ; cancel } ; + Info.log peer_info ~timestamp current_point Accepting_request + +let set_running + ?(timestamp = Time.now ()) + peer_info point data = + assert begin + match peer_info.Info.state with + | Disconnected -> true (* request to unknown peer_id. *) + | Running _ -> false + | Accepted { current_point ; _ } -> + P2p_connection.Id.equal point current_point + end ; + peer_info.state <- Running { data ; current_point = point } ; + peer_info.last_established_connection <- Some (point, timestamp) ; + Info.log peer_info ~timestamp point Connection_established + +let set_disconnected + ?(timestamp = Time.now ()) ?(requested = false) peer_info = + let current_point, (event : Pool_event.kind) = + match peer_info.Info.state with + | Accepted { current_point ; _ } -> + peer_info.last_rejected_connection <- + Some (current_point, timestamp) ; + current_point, Request_rejected + | Running { current_point ; _ } -> + peer_info.last_disconnection <- + Some (current_point, timestamp) ; + current_point, + if requested then Disconnection else External_disconnection + | Disconnected -> assert false + in + peer_info.state <- Disconnected ; + Info.log peer_info ~timestamp current_point event diff --git a/src/lib_p2p/p2p_peer_state.mli b/src/lib_p2p/p2p_peer_state.mli new file mode 100644 index 000000000..85c8ecd52 --- /dev/null +++ b/src/lib_p2p/p2p_peer_state.mli @@ -0,0 +1,115 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_peer + +type 'conn t = + | Accepted of { current_point: P2p_connection.Id.t ; + cancel: Lwt_canceler.t } + (** We accepted a incoming connection, we greeted back and + we are waiting for an acknowledgement. *) + | Running of { data: 'conn ; + current_point: P2p_connection.Id.t } + (** Successfully authentificated connection, normal business. *) + | Disconnected + (** No connection established currently. *) +type 'conn state = 'conn t + +val pp : Format.formatter -> 'conn t -> unit + +module Info : sig + + type ('conn, 'meta) t + type ('conn, 'meta) peer_info = ('conn, 'meta) t + + val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int + + val create : + ?created:Time.t -> + ?trusted:bool -> + metadata:'meta -> + Id.t -> ('conn, 'meta) peer_info + (** [create ~trusted ~meta peer_id] is a freshly minted peer_id info for + [peer_id]. *) + + val peer_id : ('conn, 'meta) peer_info -> Id.t + + val created : ('conn, 'meta) peer_info -> Time.t + val metadata : ('conn, 'meta) peer_info -> 'meta + val set_metadata : ('conn, 'meta) peer_info -> 'meta -> unit + + val trusted : ('conn, 'meta) peer_info -> bool + val set_trusted : ('conn, 'meta) peer_info -> unit + val unset_trusted : ('conn, 'meta) peer_info -> unit + + val last_failed_connection : + ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + val last_rejected_connection : + ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + val last_established_connection : + ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + val last_disconnection : + ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + + val last_seen : + ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + (** [last_seen gi] is the most recent of: + + * last established connection + * last rejected connection + * last disconnection + *) + + val last_miss : + ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + (** [last_miss gi] is the most recent of: + + * last failed connection + * last rejected connection + * last disconnection + *) + + val log_incoming_rejection : + ?timestamp:Time.t -> + ('conn, 'meta) peer_info -> P2p_connection.Id.t -> unit + + module File : sig + val load : + string -> 'meta Data_encoding.t -> + ('conn, 'meta) peer_info list tzresult Lwt.t + val save : + string -> 'meta Data_encoding.t -> + ('conn, 'meta) peer_info list -> unit tzresult Lwt.t + end + + val fold : + ('conn, 'meta) t -> init:'a -> f:('a -> Pool_event.t -> 'a) -> 'a + + val watch : + ('conn, 'meta) t -> Pool_event.t Lwt_stream.t * Lwt_watcher.stopper + +end + + +val get : ('conn, 'meta) Info.t -> 'conn state + +val is_disconnected : ('conn, 'meta) Info.t -> bool + +val set_accepted : + ?timestamp:Time.t -> + ('conn, 'meta) Info.t -> P2p_connection.Id.t -> Lwt_canceler.t -> unit + +val set_running : + ?timestamp:Time.t -> + ('conn, 'meta) Info.t -> P2p_connection.Id.t -> 'conn -> unit + +val set_disconnected : + ?timestamp:Time.t -> + ?requested:bool -> + ('conn, 'meta) Info.t -> unit diff --git a/src/lib_p2p/p2p_point_state.ml b/src/lib_p2p/p2p_point_state.ml new file mode 100644 index 000000000..e71fe8235 --- /dev/null +++ b/src/lib_p2p/p2p_point_state.ml @@ -0,0 +1,201 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_point + +type 'data t = + | Requested of { cancel: Lwt_canceler.t } + | Accepted of { current_peer_id: P2p_peer.Id.t ; + cancel: Lwt_canceler.t } + | Running of { data: 'data ; + current_peer_id: P2p_peer.Id.t } + | Disconnected +type 'data state = 'data t + +let pp ppf = function + | Requested _ -> + Format.fprintf ppf "requested" + | Accepted { current_peer_id ; _ } -> + Format.fprintf ppf "accepted %a" P2p_peer.Id.pp current_peer_id + | Running { current_peer_id ; _ } -> + Format.fprintf ppf "running %a" P2p_peer.Id.pp current_peer_id + | Disconnected -> + Format.fprintf ppf "disconnected" + +module Info = struct + + type greylisting_config = { + factor: float ; + initial_delay: int ; + disconnection_delay: int ; + } + + type 'data t = { + point : Id.t ; + mutable trusted : bool ; + mutable state : 'data state ; + mutable last_failed_connection : Time.t option ; + mutable last_rejected_connection : (P2p_peer.Id.t * Time.t) option ; + mutable last_established_connection : (P2p_peer.Id.t * Time.t) option ; + mutable last_disconnection : (P2p_peer.Id.t * Time.t) option ; + greylisting : greylisting_config ; + mutable greylisting_delay : float ; + mutable greylisting_end : Time.t ; + events : Pool_event.t Ring.t ; + watchers : Pool_event.t Lwt_watcher.input ; + } + type 'data point_info = 'data t + + let compare pi1 pi2 = Id.compare pi1.point pi2.point + + let log_size = 100 + + let default_greylisting_config = { + factor = 1.2 ; + initial_delay = 1 ; + disconnection_delay = 60 ; + } + + let create + ?(trusted = false) + ?(greylisting_config = default_greylisting_config) addr port = { + point = (addr, port) ; + trusted ; + state = Disconnected ; + last_failed_connection = None ; + last_rejected_connection = None ; + last_established_connection = None ; + last_disconnection = None ; + events = Ring.create log_size ; + greylisting = greylisting_config ; + greylisting_delay = 1. ; + greylisting_end = Time.epoch ; + watchers = Lwt_watcher.create_input () ; + } + + let point s = s.point + let trusted s = s.trusted + let set_trusted gi = gi.trusted <- true + let unset_trusted gi = gi.trusted <- false + let last_established_connection s = s.last_established_connection + let last_disconnection s = s.last_disconnection + let last_failed_connection s = s.last_failed_connection + let last_rejected_connection s = s.last_rejected_connection + let greylisted ?(now = Time.now ()) s = + Time.compare now s.greylisting_end <= 0 + let greylisted_until s = s.greylisting_end + + let last_seen s = + Time.recent s.last_rejected_connection + (Time.recent s.last_established_connection s.last_disconnection) + let last_miss s = + match + s.last_failed_connection, + (Option.map ~f:(fun (_, time) -> time) @@ + Time.recent s.last_rejected_connection s.last_disconnection) with + | (None, None) -> None + | (None, (Some _ as a)) + | (Some _ as a, None) -> a + | (Some t1 as a1 , (Some t2 as a2)) -> + if Time.compare t1 t2 < 0 then a2 else a1 + + let log { events ; watchers ; _ } ?(timestamp = Time.now ()) kind = + let event = { Pool_event.kind ; timestamp } in + Ring.add events event ; + Lwt_watcher.notify watchers event + + let log_incoming_rejection ?timestamp point_info peer_id = + log point_info ?timestamp (Rejecting_request peer_id) + + + let fold { events ; _ } ~init ~f = Ring.fold events ~init ~f + + let watch { watchers ; _ } = Lwt_watcher.create_stream watchers + +end + +let get { Info.state ; _ } = state + +let is_disconnected { Info.state ; _ } = + match state with + | Disconnected -> true + | Requested _ | Accepted _ | Running _ -> false + +let set_requested ?timestamp point_info cancel = + assert begin + match point_info.Info.state with + | Requested _ -> true + | Accepted _ | Running _ -> false + | Disconnected -> true + end ; + point_info.state <- Requested { cancel } ; + Info.log point_info ?timestamp Outgoing_request + +let set_accepted + ?(timestamp = Time.now ()) + point_info current_peer_id cancel = + (* log_notice "SET_ACCEPTED %a@." P2p_point.pp point_info.point ; *) + assert begin + match point_info.Info.state with + | Accepted _ | Running _ -> false + | Requested _ | Disconnected -> true + end ; + point_info.state <- Accepted { current_peer_id ; cancel } ; + Info.log point_info ~timestamp (Accepting_request current_peer_id) + +let set_running + ?(timestamp = Time.now ()) + point_info peer_id data = + assert begin + match point_info.Info.state with + | Disconnected -> true (* request to unknown peer_id. *) + | Running _ -> false + | Accepted { current_peer_id ; _ } -> P2p_peer.Id.equal peer_id current_peer_id + | Requested _ -> true + end ; + point_info.state <- Running { data ; current_peer_id = peer_id } ; + point_info.last_established_connection <- Some (peer_id, timestamp) ; + Info.log point_info ~timestamp (Connection_established peer_id) + +let set_greylisted timestamp point_info = + point_info.Info.greylisting_end <- + Time.add + timestamp + (Int64.of_float point_info.Info.greylisting_delay) ; + point_info.greylisting_delay <- + point_info.greylisting_delay *. point_info.greylisting.factor + +let set_disconnected + ?(timestamp = Time.now ()) ?(requested = false) point_info = + let event : Pool_event.kind = + match point_info.Info.state with + | Requested _ -> + set_greylisted timestamp point_info ; + point_info.last_failed_connection <- Some timestamp ; + Request_rejected None + | Accepted { current_peer_id ; _ } -> + set_greylisted timestamp point_info ; + point_info.last_rejected_connection <- + Some (current_peer_id, timestamp) ; + Request_rejected (Some current_peer_id) + | Running { current_peer_id ; _ } -> + point_info.greylisting_delay <- + float_of_int point_info.greylisting.initial_delay ; + point_info.greylisting_end <- + Time.add timestamp + (Int64.of_int point_info.greylisting.disconnection_delay) ; + point_info.last_disconnection <- Some (current_peer_id, timestamp) ; + if requested + then Disconnection current_peer_id + else External_disconnection current_peer_id + | Disconnected -> + assert false + in + point_info.state <- Disconnected ; + Info.log point_info ~timestamp event diff --git a/src/lib_p2p/p2p_point_state.mli b/src/lib_p2p/p2p_point_state.mli new file mode 100644 index 000000000..f79ffa0f8 --- /dev/null +++ b/src/lib_p2p/p2p_point_state.mli @@ -0,0 +1,117 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_point + +type 'conn t = + | Requested of { cancel: Lwt_canceler.t } + (** We initiated a connection. *) + | Accepted of { current_peer_id: P2p_peer.Id.t ; + cancel: Lwt_canceler.t } + (** We accepted a incoming connection. *) + | Running of { data: 'conn ; + current_peer_id: P2p_peer.Id.t } + (** Successfully authentificated connection, normal business. *) + | Disconnected + (** No connection established currently. *) +type 'conn state = 'conn t + +val pp : Format.formatter -> 'conn t -> unit + +module Info : sig + + type 'conn t + type 'conn point_info = 'conn t + (** Type of info associated to a point. *) + + val compare : 'conn point_info -> 'conn point_info -> int + + type greylisting_config = { + factor: float ; + initial_delay: int ; + disconnection_delay: int ; + } + + val create : + ?trusted:bool -> + ?greylisting_config:greylisting_config -> + P2p_addr.t -> P2p_addr.port -> 'conn point_info + (** [create ~trusted addr port] is a freshly minted point_info. If + [trusted] is true, this point is considered trusted and will + be treated as such. *) + + val trusted : 'conn point_info -> bool + (** [trusted pi] is [true] iff [pi] has is trusted, + i.e. "whitelisted". *) + + val set_trusted : 'conn point_info -> unit + val unset_trusted : 'conn point_info -> unit + + val last_failed_connection : + 'conn point_info -> Time.t option + val last_rejected_connection : + 'conn point_info -> (P2p_peer.Id.t * Time.t) option + val last_established_connection : + 'conn point_info -> (P2p_peer.Id.t * Time.t) option + val last_disconnection : + 'conn point_info -> (P2p_peer.Id.t * Time.t) option + + val last_seen : + 'conn point_info -> (P2p_peer.Id.t * Time.t) option + (** [last_seen pi] is the most recent of: + + * last established connection + * last rejected connection + * last disconnection + *) + + val last_miss : + 'conn point_info -> Time.t option + (** [last_miss pi] is the most recent of: + + * last failed connection + * last rejected connection + * last disconnection + *) + + val greylisted : + ?now:Time.t -> 'conn point_info -> bool + + val greylisted_until : 'conn point_info -> Time.t + + val point : 'conn point_info -> Id.t + + val log_incoming_rejection : + ?timestamp:Time.t -> 'conn point_info -> P2p_peer.Id.t -> unit + + val fold : + 'conn t -> init:'a -> f:('a -> Pool_event.t -> 'a) -> 'a + + val watch : + 'conn t -> Pool_event.t Lwt_stream.t * Lwt_watcher.stopper +end + +val get : 'conn Info.t -> 'conn t + +val is_disconnected : 'conn Info.t -> bool + +val set_requested : + ?timestamp:Time.t -> + 'conn Info.t -> Lwt_canceler.t -> unit + +val set_accepted : + ?timestamp:Time.t -> + 'conn Info.t -> P2p_peer.Id.t -> Lwt_canceler.t -> unit + +val set_running : + ?timestamp:Time.t -> 'conn Info.t -> P2p_peer.Id.t -> 'conn -> unit + +val set_disconnected : + ?timestamp:Time.t -> ?requested:bool -> 'conn Info.t -> unit + diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 7e3819d1b..92d689f52 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -193,11 +193,11 @@ type ('msg, 'meta) t = { message_config : 'msg message_config ; my_id_points : unit P2p_point.Table.t ; known_peer_ids : - (('msg, 'meta) connection, 'meta) P2p_peer.Pool_info.t P2p_peer.Table.t ; + (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t P2p_peer.Table.t ; connected_peer_ids : - (('msg, 'meta) connection, 'meta) P2p_peer.Pool_info.t P2p_peer.Table.t ; - known_points : ('msg, 'meta) connection P2p_point.Pool_info.t P2p_point.Table.t ; - connected_points : ('msg, 'meta) connection P2p_point.Pool_info.t P2p_point.Table.t ; + (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t P2p_peer.Table.t ; + known_points : ('msg, 'meta) connection P2p_point_state.Info.t P2p_point.Table.t ; + connected_points : ('msg, 'meta) connection P2p_point_state.Info.t P2p_point.Table.t ; incoming : Lwt_canceler.t P2p_point.Table.t ; io_sched : P2p_io_scheduler.t ; encoding : 'msg Message.t Data_encoding.t ; @@ -220,8 +220,8 @@ and ('msg, 'meta) connection = { canceler : Lwt_canceler.t ; messages : (int * 'msg) Lwt_pipe.t ; conn : 'msg Message.t P2p_socket.t ; - peer_info : (('msg, 'meta) connection, 'meta) P2p_peer.Pool_info.t ; - point_info : ('msg, 'meta) connection P2p_point.Pool_info.t option ; + peer_info : (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t ; + point_info : ('msg, 'meta) connection P2p_point_state.Info.t option ; answerer : 'msg Answerer.t Lazy.t ; mutable last_sent_swap_request : (Time.t * P2p_peer.Id.t) option ; mutable wait_close : bool ; @@ -255,9 +255,9 @@ let gc_points ({ config = { max_known_points } ; known_points } as pool) = let now = Time.now () in (* TODO: maybe time of discovery? *) let table = Gc_point_set.create target in P2p_point.Table.iter (fun p point_info -> - if P2p_point.Pool_state.is_disconnected point_info then + if P2p_point_state.is_disconnected point_info then let time = - match P2p_point.Pool_info.last_miss point_info with + match P2p_point_state.Info.last_miss point_info with | None -> now | Some t -> t in Gc_point_set.insert (time, p) table @@ -271,7 +271,7 @@ let gc_points ({ config = { max_known_points } ; known_points } as pool) = let register_point pool ?trusted _source_peer_id (addr, port as point) = match P2p_point.Table.find pool.known_points point with | exception Not_found -> - let point_info = P2p_point.Pool_info.create ?trusted addr port in + let point_info = P2p_point_state.Info.create ?trusted addr port in Option.iter pool.config.max_known_points ~f:begin fun (max, _) -> if P2p_point.Table.length pool.known_points >= max then gc_points pool end ; @@ -309,8 +309,8 @@ let gc_peer_ids ({ meta_config = { score } ; | Some (_, target) -> let table = Gc_peer_set.create target in P2p_peer.Table.iter (fun peer_id peer_info -> - let created = P2p_peer.Pool_info.created peer_info in - let score = score @@ P2p_peer.Pool_info.metadata peer_info in + let created = P2p_peer_state.Info.created peer_info in + let score = score @@ P2p_peer_state.Info.metadata peer_info in Gc_peer_set.insert (score, created, peer_id) table ) known_peer_ids ; let to_remove = Gc_peer_set.get table in @@ -323,7 +323,7 @@ let register_peer pool peer_id = match P2p_peer.Table.find pool.known_peer_ids peer_id with | exception Not_found -> Lwt_condition.broadcast pool.events.new_peer () ; - let peer = P2p_peer.Pool_info.create peer_id ~metadata:pool.meta_config.initial in + let peer = P2p_peer_state.Info.create peer_id ~metadata:pool.meta_config.initial in Option.iter pool.config.max_known_peer_ids ~f:begin fun (max, _) -> if P2p_peer.Table.length pool.known_peer_ids >= max then gc_peer_ids pool end ; @@ -363,7 +363,7 @@ let write_now { conn } msg = let write_all pool msg = P2p_peer.Table.iter (fun _peer_id peer_info -> - match P2p_peer.Pool_state.get peer_info with + match P2p_peer_state.get peer_info with | Running { data = conn } -> ignore (write_now conn msg : bool tzresult ) | _ -> ()) @@ -372,7 +372,7 @@ let write_all pool msg = let broadcast_bootstrap_msg pool = P2p_peer.Table.iter (fun _peer_id peer_info -> - match P2p_peer.Pool_state.get peer_info with + match P2p_peer_state.get peer_info with | Running { data = { conn } } -> ignore (P2p_socket.write_now conn Bootstrap : bool tzresult ) | _ -> ()) @@ -383,32 +383,32 @@ let broadcast_bootstrap_msg pool = module Peers = struct - type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer.Pool_info.t + type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t let info { known_peer_ids } point = try Some (P2p_peer.Table.find known_peer_ids point) with Not_found -> None let get_metadata pool peer_id = - try P2p_peer.Pool_info.metadata (P2p_peer.Table.find pool.known_peer_ids peer_id) + try P2p_peer_state.Info.metadata (P2p_peer.Table.find pool.known_peer_ids peer_id) with Not_found -> pool.meta_config.initial let get_score pool peer_id = pool.meta_config.score (get_metadata pool peer_id) let set_metadata pool peer_id data = - P2p_peer.Pool_info.set_metadata (register_peer pool peer_id) data + P2p_peer_state.Info.set_metadata (register_peer pool peer_id) data let get_trusted pool peer_id = - try P2p_peer.Pool_info.trusted (P2p_peer.Table.find pool.known_peer_ids peer_id) + try P2p_peer_state.Info.trusted (P2p_peer.Table.find pool.known_peer_ids peer_id) with Not_found -> false let set_trusted pool peer_id = - try P2p_peer.Pool_info.set_trusted (register_peer pool peer_id) + try P2p_peer_state.Info.set_trusted (register_peer pool peer_id) with Not_found -> () let unset_trusted pool peer_id = - try P2p_peer.Pool_info.unset_trusted (P2p_peer.Table.find pool.known_peer_ids peer_id) + try P2p_peer_state.Info.unset_trusted (P2p_peer.Table.find pool.known_peer_ids peer_id) with Not_found -> () let fold_known pool ~init ~f = @@ -421,24 +421,24 @@ end module Points = struct - type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point.Pool_info.t + type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t let info { known_points } point = try Some (P2p_point.Table.find known_points point) with Not_found -> None let get_trusted pool point = - try P2p_point.Pool_info.trusted (P2p_point.Table.find pool.known_points point) + try P2p_point_state.Info.trusted (P2p_point.Table.find pool.known_points point) with Not_found -> false let set_trusted pool point = try - P2p_point.Pool_info.set_trusted + P2p_point_state.Info.set_trusted (register_point pool pool.config.identity.peer_id point) with Not_found -> () let unset_trusted pool peer_id = - try P2p_point.Pool_info.unset_trusted (P2p_point.Table.find pool.known_points peer_id) + try P2p_point_state.Info.unset_trusted (P2p_point.Table.find pool.known_points peer_id) with Not_found -> () let fold_known pool ~init ~f = @@ -453,7 +453,7 @@ module Connection = struct let fold pool ~init ~f = Peers.fold_connected pool ~init ~f:begin fun peer_id peer_info acc -> - match P2p_peer.Pool_state.get peer_info with + match P2p_peer_state.get peer_info with | Running { data } -> f peer_id data acc | _ -> acc end @@ -503,7 +503,7 @@ module Connection = struct Option.apply (Peers.info pool peer_id) ~f:(fun p -> - match P2p_peer.Pool_state.get p with + match P2p_peer_state.get p with | Running { data } -> Some data | _ -> None) @@ -511,7 +511,7 @@ module Connection = struct Option.apply (Points.info pool point) ~f:(fun p -> - match P2p_point.Pool_state.get p with + match P2p_point_state.get p with | Running { data } -> Some data | _ -> None) @@ -532,13 +532,13 @@ type error += Closed_network type error += Too_many_connections let fail_unless_disconnected_point point_info = - match P2p_point.Pool_state.get point_info with + match P2p_point_state.get point_info with | Disconnected -> return () | Requested _ | Accepted _ -> fail Pending_connection | Running _ -> fail Connected let fail_unless_disconnected_peer_id peer_info = - match P2p_peer.Pool_state.get peer_info with + match P2p_peer_state.get peer_info with | Disconnected -> return () | Accepted _ -> fail Pending_connection | Running _ -> fail Connected @@ -546,10 +546,10 @@ let fail_unless_disconnected_peer_id peer_info = let compare_known_point_info p1 p2 = (* The most-recently disconnected peers are greater. *) (* Then come long-standing connected peers. *) - let disconnected1 = P2p_point.Pool_state.is_disconnected p1 - and disconnected2 = P2p_point.Pool_state.is_disconnected p2 in + let disconnected1 = P2p_point_state.is_disconnected p1 + and disconnected2 = P2p_point_state.is_disconnected p2 in let compare_last_seen p1 p2 = - match P2p_point.Pool_info.last_seen p1, P2p_point.Pool_info.last_seen p2 with + match P2p_point_state.Info.last_seen p1, P2p_point_state.Info.last_seen p2 with | None, None -> Random.int 2 * 2 - 1 (* HACK... *) | Some _, None -> 1 | None, Some _ -> -1 @@ -571,12 +571,12 @@ let rec connect ~timeout pool point = Lwt_utils.with_timeout ~canceler timeout begin fun canceler -> let point_info = register_point pool pool.config.identity.peer_id point in - let addr, port as point = P2p_point.Pool_info.point point_info in + let addr, port as point = P2p_point_state.Info.point point_info in fail_unless - (not pool.config.closed_network || P2p_point.Pool_info.trusted point_info) + (not pool.config.closed_network || P2p_point_state.Info.trusted point_info) Closed_network >>=? fun () -> fail_unless_disconnected_point point_info >>=? fun () -> - P2p_point.Pool_state.set_requested point_info canceler ; + P2p_point_state.set_requested point_info canceler ; let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in @@ -587,7 +587,7 @@ let rec connect ~timeout pool point = return () end ~on_error: begin fun err -> lwt_debug "connect: %a -> disconnect" P2p_point.Id.pp point >>= fun () -> - P2p_point.Pool_state.set_disconnected point_info ; + P2p_point_state.set_disconnected point_info ; Lwt_utils.safe_close fd >>= fun () -> match err with | [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] -> @@ -629,7 +629,7 @@ and authenticate pool ?point_info canceler fd point = if incoming then P2p_point.Table.remove pool.incoming point else - Option.iter ~f:P2p_point.Pool_state.set_disconnected point_info ; + Option.iter ~f:P2p_point_state.set_disconnected point_info ; Lwt.return (Error err) end >>=? fun (info, auth_fd) -> (* Authentication correct! *) @@ -654,16 +654,16 @@ and authenticate pool ?point_info canceler fd point = Option.unopt_map connection_point_info ~default:(not pool.config.closed_network) ~f:begin fun connection_point_info -> - match P2p_point.Pool_state.get connection_point_info with + match P2p_point_state.get connection_point_info with | Requested _ -> not incoming | Disconnected -> not pool.config.closed_network - || P2p_point.Pool_info.trusted connection_point_info + || P2p_point_state.Info.trusted connection_point_info | Accepted _ | Running _ -> false end in let acceptable_peer_id = - match P2p_peer.Pool_state.get peer_info with + match P2p_peer_state.get peer_info with | Accepted _ -> (* TODO: in some circumstances cancel and accept... *) false @@ -677,8 +677,8 @@ and authenticate pool ?point_info canceler fd point = log pool (Accepting_request (point, info.id_point, info.peer_id)) ; Option.iter connection_point_info ~f:(fun point_info -> - P2p_point.Pool_state.set_accepted point_info info.peer_id canceler) ; - P2p_peer.Pool_state.set_accepted peer_info info.id_point canceler ; + P2p_point_state.set_accepted point_info info.peer_id canceler) ; + P2p_peer_state.set_accepted peer_info info.id_point canceler ; lwt_debug "authenticate: %a -> accept %a" P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> @@ -700,12 +700,12 @@ and authenticate pool ?point_info canceler fd point = P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> Option.iter connection_point_info - ~f:P2p_point.Pool_state.set_disconnected ; - P2p_peer.Pool_state.set_disconnected peer_info ; + ~f:P2p_point_state.set_disconnected ; + P2p_peer_state.set_disconnected peer_info ; Lwt.return (Error err) end >>=? fun conn -> let id_point = - match info.id_point, Option.map ~f:P2p_point.Pool_info.point point_info with + match info.id_point, Option.map ~f:P2p_point_state.Info.point point_info with | (addr, _), Some (_, port) -> addr, Some port | id_point, None -> id_point in return @@ -721,14 +721,14 @@ and authenticate pool ?point_info canceler fd point = acceptable_point acceptable_peer_id >>= fun () -> P2p_socket.kick auth_fd >>= fun () -> if not incoming then begin - Option.iter ~f:P2p_point.Pool_state.set_disconnected point_info ; - (* FIXME P2p_peer.Pool_state.set_disconnected ~requested:true peer_info ; *) + Option.iter ~f:P2p_point_state.set_disconnected point_info ; + (* FIXME P2p_peer_state.set_disconnected ~requested:true peer_info ; *) end ; fail (Rejected info.peer_id) end and create_connection pool p2p_conn id_point point_info peer_info _version = - let peer_id = P2p_peer.Pool_info.peer_id peer_info in + let peer_id = P2p_peer_state.Info.peer_id peer_info in let canceler = Lwt_canceler.create () in let size = Option.map pool.config.incoming_app_message_queue_size @@ -754,22 +754,22 @@ and create_connection pool p2p_conn id_point point_info peer_info _version = last_sent_swap_request = None } in ignore (Lazy.force answerer) ; Option.iter point_info ~f:begin fun point_info -> - let point = P2p_point.Pool_info.point point_info in - P2p_point.Pool_state.set_running point_info peer_id conn ; + let point = P2p_point_state.Info.point point_info in + P2p_point_state.set_running point_info peer_id conn ; P2p_point.Table.add pool.connected_points point point_info ; end ; log pool (Connection_established (id_point, peer_id)) ; - P2p_peer.Pool_state.set_running peer_info id_point conn ; + P2p_peer_state.set_running peer_info id_point conn ; P2p_peer.Table.add pool.connected_peer_ids peer_id peer_info ; Lwt_condition.broadcast pool.events.new_connection () ; Lwt_canceler.on_cancel canceler begin fun () -> lwt_debug "Disconnect: %a (%a)" P2p_peer.Id.pp peer_id P2p_connection.Id.pp id_point >>= fun () -> - Option.iter ~f:P2p_point.Pool_state.set_disconnected point_info ; + Option.iter ~f:P2p_point_state.set_disconnected point_info ; log pool (Disconnection peer_id) ; - P2p_peer.Pool_state.set_disconnected peer_info ; + P2p_peer_state.set_disconnected peer_info ; Option.iter point_info ~f:begin fun point_info -> - P2p_point.Table.remove pool.connected_points (P2p_point.Pool_info.point point_info) ; + P2p_point.Table.remove pool.connected_points (P2p_point_state.Info.point point_info) ; end ; P2p_peer.Table.remove pool.connected_peer_ids peer_id ; if pool.config.max_connections <= active_connections pool then begin @@ -791,7 +791,7 @@ and disconnect ?(wait = false) conn = Answerer.shutdown (Lazy.force conn.answerer) and register_new_points pool conn = - let source_peer_id = P2p_peer.Pool_info.peer_id conn.peer_info in + let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in fun points -> List.iter (register_new_point pool source_peer_id) points ; Lwt.return_unit @@ -807,12 +807,12 @@ and list_known_points pool _conn () = pool.known_points [] in let best_knowns = List.take_n ~compare:compare_known_point_info 50 knowns in - Lwt.return (List.map P2p_point.Pool_info.point best_knowns) + Lwt.return (List.map P2p_point_state.Info.point best_knowns) and active_connections pool = P2p_peer.Table.length pool.connected_peer_ids and swap_request pool conn new_point _new_peer_id = - let source_peer_id = P2p_peer.Pool_info.peer_id conn.peer_info in + let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in log pool (Swap_request_received { source = source_peer_id }) ; lwt_log_info "Swap request received from %a" P2p_peer.Id.pp source_peer_id >>= fun () -> @@ -825,7 +825,7 @@ and swap_request pool conn new_point _new_peer_id = (Time.max pool.latest_succesfull_swap pool.latest_accepted_swap) in let new_point_info = register_point pool source_peer_id new_point in if span_since_last_swap < int_of_float pool.config.swap_linger - || not (P2p_point.Pool_state.is_disconnected new_point_info) then begin + || not (P2p_point_state.is_disconnected new_point_info) then begin log pool (Swap_request_ignored { source = source_peer_id }) ; lwt_log_info "Ignoring swap request from %a" P2p_peer.Id.pp source_peer_id end else begin @@ -849,7 +849,7 @@ and swap_request pool conn new_point _new_peer_id = end and swap_ack pool conn new_point _new_peer_id = - let source_peer_id = P2p_peer.Pool_info.peer_id conn.peer_info in + let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in log pool (Swap_ack_received { source = source_peer_id }) ; lwt_log_info "Swap ack received from %a" P2p_peer.Id.pp source_peer_id >>= fun () -> @@ -864,7 +864,7 @@ and swap_ack pool conn new_point _new_peer_id = Lwt.return_unit and swap pool conn current_peer_id new_point = - let source_peer_id = P2p_peer.Pool_info.peer_id conn.peer_info in + let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in pool.latest_accepted_swap <- Time.now () ; connect ~timeout:10. pool new_point >>= function | Ok _new_conn -> begin @@ -943,11 +943,11 @@ let create config meta_config message_config io_sched = latest_succesfull_swap = Time.epoch ; } in List.iter (Points.set_trusted pool) config.trusted_points ; - P2p_peer.Pool_info.File.load config.peers_file meta_config.encoding >>= function + P2p_peer_state.Info.File.load config.peers_file meta_config.encoding >>= function | Ok peer_ids -> List.iter (fun peer_info -> - let peer_id = P2p_peer.Pool_info.peer_id peer_info in + let peer_id = P2p_peer_state.Info.peer_id peer_info in P2p_peer.Table.add pool.known_peer_ids peer_id peer_info) peer_ids ; Lwt.return pool @@ -958,7 +958,7 @@ let create config meta_config message_config io_sched = let destroy pool = P2p_point.Table.fold (fun _point point_info acc -> - match P2p_point.Pool_state.get point_info with + match P2p_point_state.get point_info with | Requested { cancel } | Accepted { cancel } -> Lwt_canceler.cancel cancel >>= fun () -> acc | Running { data = conn } -> @@ -966,7 +966,7 @@ let destroy pool = | Disconnected -> acc) pool.known_points @@ P2p_peer.Table.fold (fun _peer_id peer_info acc -> - match P2p_peer.Pool_state.get peer_info with + match P2p_peer_state.get peer_info with | Accepted { cancel } -> Lwt_canceler.cancel cancel >>= fun () -> acc | Running { data = conn } -> diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index 43b1fcd31..0c6e7fe98 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -276,7 +276,7 @@ val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit module Peers : sig - type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer.Pool_info.t + type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t val info: ('msg, 'meta) pool -> P2p_peer.Id.t -> ('msg, 'meta) info option @@ -307,7 +307,7 @@ end module Points : sig - type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point.Pool_info.t + type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t val info: ('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) info option @@ -349,3 +349,4 @@ module Message : sig val encoding: 'msg encoding list -> 'msg t Data_encoding.t end + diff --git a/src/lib_shell_services/p2p_services.ml b/src/lib_shell_services/p2p_services.ml index 375d5ee6e..549dd7605 100644 --- a/src/lib_shell_services/p2p_services.ml +++ b/src/lib_shell_services/p2p_services.ml @@ -7,9 +7,6 @@ (* *) (**************************************************************************) -let (peer_id_arg : P2p_peer.Id.t RPC_arg.arg) = - Crypto_box.Public_key_hash.rpc_arg - let point_arg = RPC_arg.make ~name:"point" @@ -74,7 +71,7 @@ module Connection = struct ~output: (Data_encoding.option P2p_connection.Info.encoding) ~error: Data_encoding.empty ~description:"Details about the current P2P connection to the given peer." - RPC_path.(root / "network" / "connection" /: peer_id_arg) + RPC_path.(root / "network" / "connection" /: P2p_peer.Id.rpc_arg) let kick = RPC_service.post_service @@ -83,7 +80,7 @@ module Connection = struct ~output: Data_encoding.empty ~error: Data_encoding.empty ~description:"Forced close of the current P2P connection to the given peer." - RPC_path.(root / "network" / "connection" /: peer_id_arg / "kick") + RPC_path.(root / "network" / "connection" /: P2p_peer.Id.rpc_arg / "kick") end @@ -135,7 +132,7 @@ module Peer_id = struct ~output: (Data_encoding.option P2p_peer.Info.encoding) ~error: Data_encoding.empty ~description:"Details about a given peer." - RPC_path.(root / "network" / "peer_id" /: peer_id_arg) + RPC_path.(root / "network" / "peer_id" /: P2p_peer.Id.rpc_arg) let events = RPC_service.post_service @@ -145,7 +142,7 @@ module Peer_id = struct P2p_peer.Pool_event.encoding) ~error: Data_encoding.empty ~description:"Monitor network events related to a given peer." - RPC_path.(root / "network" / "peer_id" /: peer_id_arg / "log") + RPC_path.(root / "network" / "peer_id" /: P2p_peer.Id.rpc_arg / "log") let list = let filter = diff --git a/src/lib_stdlib_lwt/test/test_lwt_pipe.ml b/src/lib_stdlib_lwt/test/test_lwt_pipe.ml index 375f85626..3d80408a9 100644 --- a/src/lib_stdlib_lwt/test/test_lwt_pipe.ml +++ b/src/lib_stdlib_lwt/test/test_lwt_pipe.ml @@ -8,18 +8,19 @@ (**************************************************************************) open Lwt.Infix -include Logging.Make (struct let name = "test-pipe" end) let rec producer queue = function | 0 -> - lwt_log_notice "Done producing." + Format.eprintf "Done producing." ; + Lwt.return_unit | n -> Lwt_pipe.push queue () >>= fun () -> producer queue (pred n) let rec consumer queue = function | 0 -> - lwt_log_notice "Done consuming." + Format.eprintf "Done consuming." ; + Lwt.return_unit | n -> Lwt_pipe.pop queue >>= fun _ -> consumer queue (pred n) diff --git a/vendors/ocaml-blake2/src/blake2.ml b/vendors/ocaml-blake2/src/blake2.ml index 239f9f800..d3f9bcbe1 100644 --- a/vendors/ocaml-blake2/src/blake2.ml +++ b/vendors/ocaml-blake2/src/blake2.ml @@ -42,12 +42,12 @@ module Blake2b = struct invalid_arg "Blake2b.init: size must be between 1 and 64" ; let t = Cstruct.create_unsafe bytes in begin match key with - | Some key -> - or_fail ~msg:"Blake2b.init" - (fun () -> init_key t.buffer size key.Cstruct.buffer) - | None -> - or_fail ~msg:"Blake2b.init" - (fun () -> init t.buffer size) + | Some key -> + or_fail ~msg:"Blake2b.init" + (fun () -> init_key t.buffer size key.Cstruct.buffer) + | None -> + or_fail ~msg:"Blake2b.init" + (fun () -> init t.buffer size) end ; t diff --git a/vendors/ocaml-tweetnacl/src/tweetnacl.ml b/vendors/ocaml-tweetnacl/src/tweetnacl.ml index 92ee8ebfe..1d386aee9 100644 --- a/vendors/ocaml-tweetnacl/src/tweetnacl.ml +++ b/vendors/ocaml-tweetnacl/src/tweetnacl.ml @@ -341,12 +341,12 @@ module Sign = struct let sk = Cstruct.create_unsafe skbytes in begin match seed with | None -> - Cstruct.(keypair (to_bigarray pk) (to_bigarray sk)) + Cstruct.(keypair (to_bigarray pk) (to_bigarray sk)) | Some cs -> - if Cstruct.len cs < seedbytes then - invalid_arg "Sign.keypair: seed must be at least 32 bytes long" ; - Cstruct.blit cs 0 sk 0 pkbytes ; - Cstruct.(keypair_seed (to_bigarray pk) (to_bigarray sk)) + if Cstruct.len cs < seedbytes then + invalid_arg "Sign.keypair: seed must be at least 32 bytes long" ; + Cstruct.blit cs 0 sk 0 pkbytes ; + Cstruct.(keypair_seed (to_bigarray pk) (to_bigarray sk)) end ; Pk pk, Sk sk @@ -393,12 +393,12 @@ module Sign = struct let mlen = Cstruct.create_unsafe 8 in let msg = Cstruct.(create (len smsg)) in let ret = Cstruct.(verify - (to_bigarray msg) (to_bigarray mlen) - (to_bigarray smsg) (to_bigarray pk)) in + (to_bigarray msg) (to_bigarray mlen) + (to_bigarray smsg) (to_bigarray pk)) in match ret with | 0 -> - let len = Cstruct.LE.get_uint64 mlen 0 |> Int64.to_int in - Some (Cstruct.sub msg 0 len) + let len = Cstruct.LE.get_uint64 mlen 0 |> Int64.to_int in + Some (Cstruct.sub msg 0 len) | _ -> None let verify_detached ~key ~signature msg = diff --git a/vendors/ocaml-tweetnacl/test/test.ml b/vendors/ocaml-tweetnacl/test/test.ml index 25ff6d048..4241a1695 100644 --- a/vendors/ocaml-tweetnacl/test/test.ml +++ b/vendors/ocaml-tweetnacl/test/test.ml @@ -21,8 +21,8 @@ let sign () = match Sign.verify ~key:pk signed_msg with | None -> failwith "Impossible to verify" | Some verified_msg -> - assert (Hex.of_cstruct msg = - Hex.of_cstruct (Cstruct.sub verified_msg Sign.bytes msglen)) + assert (Hex.of_cstruct msg = + Hex.of_cstruct (Cstruct.sub verified_msg Sign.bytes msglen)) let sign_detached () = let pk, sk = Sign.keypair () in @@ -38,8 +38,8 @@ let sign_extended () = match Sign.verify ~key:pk signed_msg with | None -> failwith "Impossible to verify" | Some verified_msg -> - assert (Hex.of_cstruct msg = - Hex.of_cstruct (Cstruct.sub verified_msg Sign.bytes msglen)) + assert (Hex.of_cstruct msg = + Hex.of_cstruct (Cstruct.sub verified_msg Sign.bytes msglen)) let sign_extended_detached () = let pk, sk = Sign.keypair () in