@ -69,9 +69,9 @@ let default_net_limits : P2p.limits = {
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
known_points_history_size = 500 ;
known_gids_history_size = 500 ;
known_peer_ids_history_size = 500 ;
max_known_points = Some (400, 300) ;
max_known_gids = Some (400, 300) ;
max_known_peer_ids = Some (400, 300) ;
let default_net = {
@ -113,8 +113,8 @@ let limit : P2p.limits Data_encoding.t =
read_buffer_size ; read_queue_size ; write_queue_size ;
incoming_app_message_queue_size ;
incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_gids_history_size ;
max_known_points ; max_known_gids ;
known_points_history_size ; known_peer_ids_history_size ;
max_known_points ; max_known_peer_ids ;
} ->
( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
@ -122,8 +122,8 @@ let limit : P2p.limits Data_encoding.t =
( read_buffer_size, read_queue_size, write_queue_size,
incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_gids_history_size,
max_known_points, max_known_gids
known_points_history_size, known_peer_ids_history_size,
max_known_points, max_known_peer_ids
(fun ( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
@ -131,8 +131,8 @@ let limit : P2p.limits Data_encoding.t =
( read_buffer_size, read_queue_size, write_queue_size,
incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_gids_history_size,
max_known_points, max_known_gids
known_points_history_size, known_peer_ids_history_size,
max_known_points, max_known_peer_ids
) ) ->
{ authentification_timeout ; min_connections ; expected_connections ;
max_connections ; backlog ; max_incoming_connections ;
@ -140,8 +140,8 @@ let limit : P2p.limits Data_encoding.t =
read_buffer_size ; read_queue_size ; write_queue_size ;
incoming_app_message_queue_size ;
incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_gids_history_size ;
max_known_points ; max_known_gids
known_points_history_size ; known_peer_ids_history_size ;
max_known_points ; max_known_peer_ids
@ -169,10 +169,10 @@ let limit : P2p.limits Data_encoding.t =
(opt "outgoing-message-queue-size" int31)
(dft "known_points_history_size" uint16
(dft "known_gids_history_size" uint16
(dft "known_peer_ids_history_size" uint16
(opt "max_known_points" (tup2 uint16 uint16))
(opt "max_known_gids" (tup2 uint16 uint16))
(opt "max_known_peer_ids" (tup2 uint16 uint16))
let net =
@ -303,9 +303,9 @@ let update
max_known_points =
peer_table_size ;
max_known_gids =
max_known_peer_ids =
peer_table_size ;
peer_table_size ;
} in
let net : net = {
expected_pow =
@ -15,7 +15,7 @@ let identity_file data_dir = data_dir // Node_identity_file.default_name
let show { Node_config_file.data_dir } =
|||| (identity_file data_dir) >>=? fun id ->
Format.printf "Gid: %a.@." P2p_types.Gid.pp id.gid ;
Format.printf "Peer_id: %a.@." P2p_types.Peer_id.pp id.peer_id ;
return ()
let generate { Node_config_file.data_dir ; net } =
@ -30,15 +30,15 @@ let generate { Node_config_file.data_dir ; net } =
Node_identity_file.write identity_file id >>=? fun () ->
"Stored the new identity (%a) into '%s'.@."
P2p.Gid.pp id.gid identity_file ;
P2p.Peer_id.pp id.peer_id identity_file ;
return ()
let check { Node_config_file.data_dir ; net = { expected_pow } } =
~expected_pow (identity_file data_dir) >>=? fun id ->
"Gid: %a. Proof of work is higher than %.2f.@."
P2p_types.Gid.pp id.gid expected_pow ;
"Peer_id: %a. Proof of work is higher than %.2f.@."
P2p_types.Peer_id.pp id.peer_id expected_pow ;
return ()
(** Main *)
@ -85,7 +85,7 @@ let init_node ?sandbox (config : Node_config_file.t) =
Node_identity_file.default_name) >>=? fun identity ->
"Peer's global id: %a"
P2p.Gid.pp identity.gid >>= fun () ->
P2p.Peer_id.pp identity.peer_id >>= fun () ->
(* TODO "WARN" when pow is below our expectation. *)
match with
@ -62,9 +62,9 @@ type limits = {
incoming_message_queue_size : int option ;
outgoing_message_queue_size : int option ;
known_gids_history_size : int ;
known_peer_ids_history_size : int ;
known_points_history_size : int ;
max_known_gids : (int * int) option ;
max_known_peer_ids : (int * int) option ;
max_known_points : (int * int) option ;
@ -96,10 +96,10 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched =
incoming_app_message_queue_size = limits.incoming_app_message_queue_size ;
incoming_message_queue_size = limits.incoming_message_queue_size ;
outgoing_message_queue_size = limits.outgoing_message_queue_size ;
known_gids_history_size = limits.known_gids_history_size ;
known_peer_ids_history_size = limits.known_peer_ids_history_size ;
known_points_history_size = limits.known_points_history_size ;
max_known_points = limits.max_known_points ;
max_known_gids = limits.max_known_gids ;
max_known_peer_ids = limits.max_known_peer_ids ;
let pool =
@ -173,7 +173,7 @@ module Real = struct
welcome ;
let gid { config } = config.identity.gid
let peer_id { config } = config.identity.peer_id
let maintain { maintenance } () =
P2p_maintenance.maintain maintenance
@ -191,9 +191,9 @@ module Real = struct
let connections { pool } () =
P2p_connection_pool.fold_connections pool
~init:[] ~f:(fun _gid c acc -> c :: acc)
let find_connection { pool } gid =
P2p_connection_pool.Gids.find_connection pool gid
~init:[] ~f:(fun _peer_id c acc -> c :: acc)
let find_connection { pool } peer_id =
P2p_connection_pool.Peer_ids.find_connection pool peer_id
let connection_info _net conn =
P2p_connection_pool.connection_info conn
let connection_stat _net conn =
@ -201,9 +201,9 @@ module Real = struct
let global_stat { pool } () =
P2p_connection_pool.pool_stat pool
let set_metadata { pool } conn meta =
P2p_connection_pool.Gids.set_metadata pool conn meta
P2p_connection_pool.Peer_ids.set_metadata pool conn meta
let get_metadata { pool } conn =
P2p_connection_pool.Gids.get_metadata pool conn
P2p_connection_pool.Peer_ids.get_metadata pool conn
let rec recv _net conn =
|||| conn >>=? fun msg ->
@ -216,7 +216,7 @@ module Real = struct
let pipes =
net.pool ~init:[]
~f:begin fun _gid conn acc ->
~f:begin fun _peer_id conn acc ->
(P2p_connection_pool.is_readable conn >>= function
| Ok () -> Lwt.return (Some conn)
| Error _ -> Lwt_utils.never_ending) :: acc
@ -286,7 +286,7 @@ module Fake = struct
let connection_info = {
Connection_info.incoming = false ;
gid = id.gid ;
peer_id = id.peer_id ;
id_point = (Ipaddr.V6.unspecified, None) ;
remote_socket_port = 0 ;
versions = [] ;
@ -295,17 +295,17 @@ module Fake = struct
type ('msg, 'meta) t = {
gid : Gid.t ;
peer_id : Peer_id.t ;
maintain : unit -> unit Lwt.t ;
roll : unit -> unit Lwt.t ;
shutdown : unit -> unit Lwt.t ;
connections : unit -> ('msg, 'meta) connection list ;
find_connection : Gid.t -> ('msg, 'meta) connection option ;
find_connection : Peer_id.t -> ('msg, 'meta) connection option ;
connection_info : ('msg, 'meta) connection -> Connection_info.t ;
connection_stat : ('msg, 'meta) connection -> Stat.t ;
global_stat : unit -> Stat.t ;
get_metadata : Gid.t -> 'meta option ;
set_metadata : Gid.t -> 'meta -> unit ;
get_metadata : Peer_id.t -> 'meta option ;
set_metadata : Peer_id.t -> 'meta -> unit ;
recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ;
recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ;
send : ('msg, 'meta) connection -> 'msg -> unit Lwt.t ;
@ -318,7 +318,7 @@ type ('msg, 'meta) net = ('msg, 'meta) t
let create ~config ~limits meta_cfg msg_cfg =
Real.create ~config ~limits meta_cfg msg_cfg >>= fun net ->
Lwt.return {
gid = Real.gid net ;
peer_id = Real.peer_id net ;
maintain = Real.maintain net ;
roll = Real.roll net ;
shutdown = Real.shutdown net ;
@ -338,7 +338,7 @@ let create ~config ~limits meta_cfg msg_cfg =
let faked_network = {
gid = ;
peer_id = ;
maintain = Lwt.return ;
roll = Lwt.return ;
shutdown = Lwt.return ;
@ -357,7 +357,7 @@ let faked_network = {
pool = None
let gid net = net.gid
let peer_id net = net.peer_id
let maintain net = net.maintain ()
let roll net = net.roll ()
let shutdown net = net.shutdown ()
@ -404,19 +404,19 @@ module RPC = struct
P2p_connection_pool.connect ~timeout pool point >>|? ignore
module Connection = struct
let info net gid =
let info net peer_id =
match net.pool with
| None -> None
| Some pool ->
(P2p_connection_pool.Gids.find_connection pool gid)
(P2p_connection_pool.Peer_ids.find_connection pool peer_id)
let kick net gid wait =
let kick net peer_id wait =
match net.pool with
| None -> Lwt.return_unit
| Some pool ->
match P2p_connection_pool.Gids.find_connection pool gid with
match P2p_connection_pool.Peer_ids.find_connection pool peer_id with
| None -> Lwt.return_unit
| Some conn -> P2p_connection_pool.disconnect ~wait conn
@ -426,7 +426,7 @@ module RPC = struct
| Some pool ->
pool ~init:[]
~f:begin fun _gid c acc ->
~f:begin fun _peer_id c acc ->
P2p_connection_pool.connection_info c :: acc
@ -456,33 +456,33 @@ module RPC = struct
trusted : bool ;
greylisted_end : Time.t ;
state : state ;
gid : Gid.t option ;
peer_id : Peer_id.t option ;
last_failed_connection : Time.t option ;
last_rejected_connection : (Gid.t * Time.t) option ;
last_established_connection : (Gid.t * Time.t) option ;
last_disconnection : (Gid.t * Time.t) option ;
last_seen : (Gid.t * Time.t) option ;
last_rejected_connection : (Peer_id.t * Time.t) option ;
last_established_connection : (Peer_id.t * Time.t) option ;
last_disconnection : (Peer_id.t * Time.t) option ;
last_seen : (Peer_id.t * Time.t) option ;
last_miss : Time.t option ;
let info_encoding =
let open Data_encoding in
(fun { trusted ; greylisted_end ; state ; gid ;
(fun { trusted ; greylisted_end ; state ; peer_id ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection ;
last_seen ; last_miss ;
} ->
(trusted, greylisted_end, state, gid,
(trusted, greylisted_end, state, peer_id,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection,
last_seen, last_miss)
(fun (trusted, greylisted_end, state, gid,
(fun (trusted, greylisted_end, state, peer_id,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection,
last_seen, last_miss) ->
{ trusted ; greylisted_end ; state ; gid ;
{ trusted ; greylisted_end ; state ; peer_id ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection ;
last_seen ; last_miss ;
@ -492,25 +492,25 @@ module RPC = struct
(req "trusted" bool)
(dft "greylisted_end" Time.encoding Time.epoch)
(req "state" state_encoding)
(opt "gid" Gid.encoding)
(opt "peer_id" Peer_id.encoding)
(opt "last_failed_connection" Time.encoding)
(opt "last_rejected_connection" (tup2 Gid.encoding Time.encoding))
(opt "last_established_connection" (tup2 Gid.encoding Time.encoding))
(opt "last_disconnection" (tup2 Gid.encoding Time.encoding))
(opt "last_seen" (tup2 Gid.encoding Time.encoding))
(opt "last_rejected_connection" (tup2 Peer_id.encoding Time.encoding))
(opt "last_established_connection" (tup2 Peer_id.encoding Time.encoding))
(opt "last_disconnection" (tup2 Peer_id.encoding Time.encoding))
(opt "last_seen" (tup2 Peer_id.encoding Time.encoding))
(opt "last_miss" Time.encoding))
let info_of_point_info i =
let open P2p_connection_pool in
let open P2p_connection_pool_types in
let state, gid = match Point_info.State.get i with
let state, peer_id = match Point_info.State.get i with
| Requested _ -> Requested, None
| Accepted { current_gid } -> Accepted, Some current_gid
| Running { current_gid } -> Running, Some current_gid
| Accepted { current_peer_id } -> Accepted, Some current_peer_id
| Running { current_peer_id } -> Running, Some current_peer_id
| Disconnected -> Disconnected, None in
trusted = trusted i ;
state ; gid ;
state ; peer_id ;
greylisted_end = greylisted_end i ;
last_failed_connection = last_failed_connection i ;
last_rejected_connection = last_rejected_connection i ;
@ -568,7 +568,7 @@ module RPC = struct
module Gid = struct
module Peer_id = struct
type state =
| Accepted
| Running
@ -631,22 +631,22 @@ module RPC = struct
(opt "last_seen" (tup2 Id_point.encoding Time.encoding))
(opt "last_miss" (tup2 Id_point.encoding Time.encoding))))
let info_of_gid_info pool i =
let info_of_peer_info pool i =
let open P2p_connection_pool in
let open P2p_connection_pool_types in
let state, id_point = match Gid_info.State.get i with
let state, id_point = match Peer_info.State.get i with
| Accepted { current_point } -> Accepted, Some current_point
| Running { current_point } -> Running, Some current_point
| Disconnected -> Disconnected, None
let gid = Gid_info.gid i in
let meta = Gid_info.metadata i in
let peer_id = Peer_info.peer_id i in
let meta = Peer_info.metadata i in
let score = P2p_connection_pool.score pool meta in
let stat =
match P2p_connection_pool.Gids.find_connection pool gid with
match P2p_connection_pool.Peer_ids.find_connection pool peer_id with
| None -> Stat.empty
| Some conn -> P2p_connection_pool.connection_stat conn
in Gid_info.{
in Peer_info.{
score ;
trusted = trusted i ;
state ;
@ -660,49 +660,49 @@ module RPC = struct
last_miss = last_miss i ;
let info net gid =
let info net peer_id =
match net.pool with
| None -> None
| Some pool -> begin
match pool gid with
| Some info -> Some (info_of_gid_info pool info)
match pool peer_id with
| Some info -> Some (info_of_peer_info pool info)
| None -> None
module Event = P2p_connection_pool_types.Gid_info.Event
module Event = P2p_connection_pool_types.Peer_info.Event
let events ?(max=max_int) ?(rev=false) net gid =
let events ?(max=max_int) ?(rev=false) net peer_id =
match net.pool with
| None -> []
| Some pool ->
( pool gid)
( pool peer_id)
~f:begin fun gi ->
let evts = P2p_connection_pool_types.Gid_info.fold_events gi
let evts = P2p_connection_pool_types.Peer_info.fold_events gi
~init:[] ~f:(fun a e -> e :: a) in
(if rev then list_rev_sub else list_sub) evts max
let watch net gid =
let watch net peer_id =
match net.pool with
| None -> raise Not_found
| Some pool ->
match pool gid with
match pool peer_id with
| None -> raise Not_found
| Some gi -> gi
| Some gi -> gi
let infos ?(restrict=[]) net =
match net.pool with
| None -> []
| Some pool ->
P2p_connection_pool.Gids.fold_known pool
P2p_connection_pool.Peer_ids.fold_known pool
~f:begin fun gid i a ->
let info = info_of_gid_info pool i in
~f:begin fun peer_id i a ->
let info = info_of_peer_info pool i in
match restrict with
| [] -> (gid, info) :: a
| _ when List.mem info.state restrict -> (gid, info) :: a
| [] -> (peer_id, info) :: a
| _ when List.mem info.state restrict -> (peer_id, info) :: a
| _ -> a
@ -17,7 +17,7 @@ type port = int
module Version = P2p_types.Version
(** A global identifier for a peer, a.k.a. an identity *)
module Gid = P2p_types.Gid
module Peer_id = P2p_types.Peer_id
module Identity = P2p_types.Identity
@ -64,7 +64,7 @@ type config = {
peers_file : string ;
(** The path to the JSON file where the metadata associated to
gids are loaded / stored. *)
peer_ids are loaded / stored. *)
closed_network : bool ;
(** If [true], the only accepted connections are from peers whose
@ -115,11 +115,11 @@ type limits = {
outgoing_message_queue_size : int option ;
(** Various bounds for internal queues. *)
known_gids_history_size : int ;
known_peer_ids_history_size : int ;
known_points_history_size : int ;
(** Size of circular log buffers, in number of events recorded. *)
max_known_gids : (int * int) option ;
max_known_peer_ids : (int * int) option ;
max_known_points : (int * int) option ;
(** Optional limitation of internal hashtables (max, target) *)
@ -136,8 +136,8 @@ val create :
config:config -> limits:limits ->
'meta meta_config -> 'msg message_config -> ('msg, 'meta) net Lwt.t
(** Return one's gid *)
val gid : ('msg, 'meta) net -> Gid.t
(** Return one's peer_id *)
val peer_id : ('msg, 'meta) net -> Peer_id.t
(** A maintenance operation : try and reach the ideal number of peers *)
val maintain : ('msg, 'meta) net -> unit Lwt.t
@ -154,8 +154,8 @@ type ('msg, 'meta) connection
(** Access the domain of active peers *)
val connections : ('msg, 'meta) net -> ('msg, 'meta) connection list
(** Return the active peer with identity [gid] *)
val find_connection : ('msg, 'meta) net -> Gid.t -> ('msg, 'meta) connection option
(** Return the active peer with identity [peer_id] *)
val find_connection : ('msg, 'meta) net -> Peer_id.t -> ('msg, 'meta) connection option
(** Access the info of an active peer, if available *)
val connection_info :
@ -165,8 +165,8 @@ val connection_stat :
val global_stat : ('msg, 'meta) net -> Stat.t
(** Accessors for meta information about a global identifier *)
val get_metadata : ('msg, 'meta) net -> Gid.t -> 'meta option
val set_metadata : ('msg, 'meta) net -> Gid.t -> 'meta -> unit
val get_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta option
val set_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta -> unit
(** Wait for a message from a given connection. *)
val recv :
@ -199,8 +199,8 @@ module RPC : sig
val connect : ('msg, 'meta) net -> Point.t -> float -> unit tzresult Lwt.t
module Connection : sig
val info : ('msg, 'meta) net -> Gid.t -> Connection_info.t option
val kick : ('msg, 'meta) net -> Gid.t -> bool -> unit Lwt.t
val info : ('msg, 'meta) net -> Peer_id.t -> Connection_info.t option
val kick : ('msg, 'meta) net -> Peer_id.t -> bool -> unit Lwt.t
val list : ('msg, 'meta) net -> Connection_info.t list
val count : ('msg, 'meta) net -> int
@ -219,12 +219,12 @@ module RPC : sig
trusted : bool ;
greylisted_end : Time.t ;
state : state ;
gid : Gid.t option ;
peer_id : Peer_id.t option ;
last_failed_connection : Time.t option ;
last_rejected_connection : (Gid.t * Time.t) option ;
last_established_connection : (Gid.t * Time.t) option ;
last_disconnection : (Gid.t * Time.t) option ;
last_seen : (Gid.t * Time.t) option ;
last_rejected_connection : (Peer_id.t * Time.t) option ;
last_established_connection : (Peer_id.t * Time.t) option ;
last_disconnection : (Peer_id.t * Time.t) option ;
last_seen : (Peer_id.t * Time.t) option ;
last_miss : Time.t option ;
@ -242,7 +242,7 @@ module RPC : sig
('msg, 'meta) net -> Point.t -> Event.t Lwt_stream.t * Watcher.stopper
module Gid : sig
module Peer_id : sig
type state =
| Accepted
@ -266,12 +266,16 @@ module RPC : sig
val info_encoding : info Data_encoding.t
module Event = P2p_connection_pool_types.Gid_info.Event
module Event = P2p_connection_pool_types.Peer_info.Event
val info : ('msg, 'meta) net -> Gid.t -> info option
val infos : ?restrict:state list -> ('msg, 'meta) net -> (Gid.t * info) list
val events : ?max:int -> ?rev:bool -> ('msg, 'meta) net -> Gid.t -> Event.t list
val watch : ('msg, 'meta) net -> Gid.t -> Event.t Lwt_stream.t * Watcher.stopper
val info :
('msg, 'meta) net -> Peer_id.t -> info option
val infos :
?restrict:state list -> ('msg, 'meta) net -> (Peer_id.t * info) list
val events :
?max:int -> ?rev:bool -> ('msg, 'meta) net -> Peer_id.t -> Event.t list
val watch :
('msg, 'meta) net -> Peer_id.t -> Event.t Lwt_stream.t * Watcher.stopper
@ -29,7 +29,7 @@ type error += Encoding_error
type error += Rejected
type error += Decoding_error
type error += Myself of Id_point.t
type error += Not_enough_proof_of_work of Gid.t
type error += Not_enough_proof_of_work of Peer_id.t
type error += Invalid_auth
module Crypto = struct
@ -186,18 +186,19 @@ let authenticate
let remote_listening_port =
if incoming then msg.port else Some remote_socket_port in
let id_point = remote_addr, remote_listening_port in
let remote_gid = Crypto_box.hash msg.public_key in
let remote_peer_id = Crypto_box.hash msg.public_key in
(remote_gid <> identity.Identity.gid)
(remote_peer_id <> identity.Identity.peer_id)
(Myself id_point) >>=? fun () ->
msg.public_key msg.proof_of_work_stamp proof_of_work_target)
(Not_enough_proof_of_work remote_gid) >>=? fun () ->
(Not_enough_proof_of_work remote_peer_id) >>=? fun () ->
let channel_key =
Crypto_box.precompute identity.Identity.secret_key msg.public_key in
let info =
{ Connection_info.gid = remote_gid ; versions = msg.versions ; incoming ;
{ Connection_info.peer_id = remote_peer_id ;
versions = msg.versions ; incoming ;
id_point ; remote_socket_port ;} in
let cryptobox_data =
{ Crypto.channel_key ; local_nonce ;
@ -25,7 +25,7 @@ type error += Encoding_error
type error += Decoding_error
type error += Rejected
type error += Myself of Id_point.t
type error += Not_enough_proof_of_work of Gid.t
type error += Not_enough_proof_of_work of Peer_id.t
type error += Invalid_auth
type authenticated_fd
@ -11,9 +11,9 @@
(* TODO do not recompute list_known_points at each requests... but
only once in a while, e.g. every minutes or when a point
or the associated gid is blacklisted. *)
or the associated peer_id is blacklisted. *)
(* TODO allow to track "requested gids" when we reconnect to a point. *)
(* TODO allow to track "requested peer_ids" when we reconnect to a point. *)
open P2p_types
open P2p_connection_pool_types
@ -130,19 +130,19 @@ module LogEvent = struct
| Too_few_connections
| Too_many_connections
| New_point of Point.t
| New_peer of Gid.t
| New_peer of Peer_id.t
| Incoming_connection of Point.t
| Outgoing_connection of Point.t
| Authentication_failed of Point.t
| Accepting_request of Point.t * Id_point.t * Gid.t
| Rejecting_request of Point.t * Id_point.t * Gid.t
| Request_rejected of Point.t * (Id_point.t * Gid.t) option
| Connection_established of Id_point.t * Gid.t
| Disconnection of Gid.t
| External_disconnection of Gid.t
| Accepting_request of Point.t * Id_point.t * Peer_id.t
| Rejecting_request of Point.t * Id_point.t * Peer_id.t
| Request_rejected of Point.t * (Id_point.t * Peer_id.t) option
| Connection_established of Id_point.t * Peer_id.t
| Disconnection of Peer_id.t
| External_disconnection of Peer_id.t
| Gc_points
| Gc_gids
| Gc_peer_ids
let encoding =
let open Data_encoding in
@ -162,7 +162,7 @@ module LogEvent = struct
(function New_point p -> Some p | _ -> None)
(fun p -> New_point p) ;
case ~tag:3 (branch_encoding "new_peer"
(obj1 (req "gid" Gid.encoding)))
(obj1 (req "peer_id" Peer_id.encoding)))
(function New_peer p -> Some p | _ -> None)
(fun p -> New_peer p) ;
case ~tag:4 (branch_encoding "incoming_connection"
@ -181,42 +181,42 @@ module LogEvent = struct
(req "point" Point.encoding)
(req "id_point" Id_point.encoding)
(req "gid" Gid.encoding)))
(req "peer_id" Peer_id.encoding)))
(function Accepting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None)
(fun (p, id_p, g) -> Accepting_request (p, id_p, g)) ;
case ~tag:8 (branch_encoding "rejecting_request"
(req "point" Point.encoding)
(req "id_point" Id_point.encoding)
(req "gid" Gid.encoding)))
(req "peer_id" Peer_id.encoding)))
(function Rejecting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None)
(fun (p, id_p, g) -> Rejecting_request (p, id_p, g)) ;
case ~tag:9 (branch_encoding "request_rejected"
(req "point" Point.encoding)
(opt "identity" (tup2 Id_point.encoding Gid.encoding))))
(opt "identity" (tup2 Id_point.encoding Peer_id.encoding))))
(function Request_rejected (p, id) -> Some (p, id) | _ -> None)
(fun (p, id) -> Request_rejected (p, id)) ;
case ~tag:10 (branch_encoding "connection_established"
(req "id_point" Id_point.encoding)
(req "gid" Gid.encoding)))
(req "peer_id" Peer_id.encoding)))
(function Connection_established (id_p, g) -> Some (id_p, g) | _ -> None)
(fun (id_p, g) -> Connection_established (id_p, g)) ;
case ~tag:11 (branch_encoding "disconnection"
(obj1 (req "gid" Gid.encoding)))
(obj1 (req "peer_id" Peer_id.encoding)))
(function Disconnection g -> Some g | _ -> None)
(fun g -> Disconnection g) ;
case ~tag:12 (branch_encoding "external_disconnection"
(obj1 (req "gid" Gid.encoding)))
(obj1 (req "peer_id" Peer_id.encoding)))
(function External_disconnection g -> Some g | _ -> None)
(fun g -> External_disconnection g) ;
case ~tag:13 (branch_encoding "gc_points" empty)
(function Gc_points -> Some () | _ -> None)
(fun () -> Gc_points) ;
case ~tag:14 (branch_encoding "gc_gids" empty)
(function Gc_gids -> Some () | _ -> None)
(fun () -> Gc_gids) ;
case ~tag:14 (branch_encoding "gc_peer_ids" empty)
(function Gc_peer_ids -> Some () | _ -> None)
(fun () -> Gc_peer_ids) ;
let log watcher event = Watcher.notify watcher event
@ -224,23 +224,23 @@ module LogEvent = struct
let too_few_connections watcher = log watcher Too_few_connections
let too_many_connections watcher = log watcher Too_many_connections
let new_point watcher ~point = log watcher (New_point point)
let new_peer watcher ~gid = log watcher (New_peer gid)
let new_peer watcher ~peer_id = log watcher (New_peer peer_id)
let incoming_connection watcher ~point = log watcher (Incoming_connection point)
let outgoing_connection watcher ~point = log watcher (Outgoing_connection point)
let authentication_failed watcher ~point = log watcher (Authentication_failed point)
let accepting_request watcher ~id_point ~point ~gid =
log watcher (Accepting_request (point, id_point, gid))
let rejecting_request watcher ~id_point ~point ~gid =
log watcher (Rejecting_request (point, id_point, gid))
let accepting_request watcher ~id_point ~point ~peer_id =
log watcher (Accepting_request (point, id_point, peer_id))
let rejecting_request watcher ~id_point ~point ~peer_id =
log watcher (Rejecting_request (point, id_point, peer_id))
let request_rejected watcher ?credentials ~point =
log watcher (Request_rejected (point, credentials))
let connection_established watcher ~id_point ~gid =
log watcher (Connection_established (id_point, gid))
let disconnection watcher ~is_external ~gid =
log watcher (if is_external then External_disconnection gid
else Disconnection gid)
let connection_established watcher ~id_point ~peer_id =
log watcher (Connection_established (id_point, peer_id))
let disconnection watcher ~is_external ~peer_id =
log watcher (if is_external then External_disconnection peer_id
else Disconnection peer_id)
let gc_points watcher = log watcher Gc_points
let gc_gids watcher = log watcher Gc_gids
let gc_peer_ids watcher = log watcher Gc_peer_ids
type config = {
@ -262,10 +262,10 @@ type config = {
incoming_message_queue_size : int option ;
outgoing_message_queue_size : int option ;
known_gids_history_size : int ;
known_peer_ids_history_size : int ;
known_points_history_size : int ;
max_known_points : (int * int) option ; (* max, gc target *)
max_known_gids : (int * int) option ; (* max, gc target *)
max_known_peer_ids : (int * int) option ; (* max, gc target *)
type 'meta meta_config = {
@ -284,8 +284,8 @@ type ('msg, 'meta) t = {
meta_config : 'meta meta_config ;
message_config : 'msg message_config ;
my_id_points : unit Point.Table.t ;
known_gids : (('msg, 'meta) connection, 'meta) Gid_info.t Gid.Table.t ;
connected_gids : (('msg, 'meta) connection, 'meta) Gid_info.t Gid.Table.t ;
known_peer_ids : (('msg, 'meta) connection, 'meta) Peer_info.t Peer_id.Table.t ;
connected_peer_ids : (('msg, 'meta) connection, 'meta) Peer_info.t Peer_id.Table.t ;
known_points : ('msg, 'meta) connection Point_info.t Point.Table.t ;
connected_points : ('msg, 'meta) connection Point_info.t Point.Table.t ;
incoming : Canceler.t Point.Table.t ;
@ -307,7 +307,7 @@ and ('msg, 'meta) connection = {
canceler : Canceler.t ;
messages : (int * 'msg) Lwt_pipe.t ;
conn : 'msg Message.t P2p_connection.t ;
gid_info : (('msg, 'meta) connection, 'meta) Gid_info.t ;
peer_info : (('msg, 'meta) connection, 'meta) Peer_info.t ;
point_info : ('msg, 'meta) connection Point_info.t option ;
answerer : 'msg Answerer.t ;
mutable wait_close : bool ;
@ -366,57 +366,57 @@ let register_point pool ?trusted (addr, port as point) =
| pi -> pi
(* Bounded table used to garbage collect gid infos when needed. The
strategy used is to remove the info of the gid with the lowest
(* Bounded table used to garbage collect peer_id infos when needed. The
strategy used is to remove the info of the peer_id with the lowest
score first. In case of equality, the info of the most recent added
gid is removed. The rationale behind this choice is that in the
peer_id is removed. The rationale behind this choice is that in the
case of a flood attack, the newly added infos will probably belong
to gids with the same (low) score and removing the most recent ones
ensure that older (and probably legit) gid infos are kept. *)
module GcGidSet = Utils.Bounded(struct
type t = float * Time.t * Gid.t
to peer_ids with the same (low) score and removing the most recent ones
ensure that older (and probably legit) peer_id infos are kept. *)
module GcPeer_idSet = Utils.Bounded(struct
type t = float * Time.t * Peer_id.t
let compare (s, t, _) (s', t', _) =
let score_cmp = s s' in
if score_cmp = 0 then t t' else - score_cmp
let gc_gids ({ meta_config = { score } ;
config = { max_known_gids } ;
known_gids ; } as pool) =
match max_known_gids with
let gc_peer_ids ({ meta_config = { score } ;
config = { max_known_peer_ids } ;
known_peer_ids ; } as pool) =
match max_known_peer_ids with
| None -> ()
| Some (_, target) ->
let table = GcGidSet.create target in
Gid.Table.iter (fun gid gid_info ->
let created = Gid_info.created gid_info in
let score = score @@ Gid_info.metadata gid_info in
GcGidSet.insert (score, created, gid) table
) known_gids ;
let to_remove = GcGidSet.get table in
ListLabels.iter to_remove ~f:begin fun (_, _, gid) ->
Gid.Table.remove known_gids gid
let table = GcPeer_idSet.create target in
Peer_id.Table.iter (fun peer_id peer_info ->
let created = Peer_info.created peer_info in
let score = score @@ Peer_info.metadata peer_info in
GcPeer_idSet.insert (score, created, peer_id) table
) known_peer_ids ;
let to_remove = GcPeer_idSet.get table in
ListLabels.iter to_remove ~f:begin fun (_, _, peer_id) ->
Peer_id.Table.remove known_peer_ids peer_id
end ;
LogEvent.gc_gids pool.watcher
LogEvent.gc_peer_ids pool.watcher
let register_peer pool gid =
match Gid.Table.find pool.known_gids gid with
let register_peer pool peer_id =
match Peer_id.Table.find pool.known_peer_ids peer_id with
| exception Not_found ->
Lwt_condition.broadcast () ;
let peer = Gid_info.create gid ~metadata:pool.meta_config.initial in
iter_option pool.config.max_known_gids ~f:begin fun (max, _) ->
if Gid.Table.length pool.known_gids >= max then gc_gids pool
let peer = Peer_info.create peer_id ~metadata:pool.meta_config.initial in
iter_option pool.config.max_known_peer_ids ~f:begin fun (max, _) ->
if Peer_id.Table.length pool.known_peer_ids >= max then gc_peer_ids pool
end ;
Gid.Table.add pool.known_gids gid peer ;
LogEvent.new_peer pool.watcher gid ;
Peer_id.Table.add pool.known_peer_ids peer_id peer ;
LogEvent.new_peer pool.watcher peer_id ;
| peer -> peer
let register_new_point pool _gid point =
let register_new_point pool _peer_id point =
if not (Point.Table.mem pool.my_id_points point) then
ignore (register_point pool point)
let register_new_points pool gid points =
List.iter (register_new_point pool gid) points ;
let register_new_points pool peer_id points =
List.iter (register_new_point pool peer_id) points ;
let compare_known_point_info p1 p2 =
@ -439,17 +439,17 @@ let compare_known_point_info p1 p2 =
| true, false -> 1
| true, true -> compare_last_seen p2 p1
let list_known_points pool _gid () =
let list_known_points pool _peer_id () =
let knowns =
Point.Table.fold (fun _ pi acc -> pi :: acc) pool.known_points [] in
let best_knowns =
Utils.take_n ~compare:compare_known_point_info 50 knowns in
Lwt.return ( Point_info.point best_knowns)
let active_connections pool = Gid.Table.length pool.connected_gids
let active_connections pool = Peer_id.Table.length pool.connected_peer_ids
let create_connection pool conn id_point pi gi _version =
let gid = Gid_info.gid gi in
let peer_id = Peer_info.peer_id gi in
let canceler = Canceler.create () in
let size =
map_option pool.config.incoming_app_message_queue_size
@ -458,32 +458,32 @@ let create_connection pool conn id_point pi gi _version =
let callback =
{ Answerer.message =
(fun size msg -> Lwt_pipe.push messages (size, msg)) ;
advertise = register_new_points pool gid ;
bootstrap = list_known_points pool gid ;
advertise = register_new_points pool peer_id ;
bootstrap = list_known_points pool peer_id ;
} in
let answerer = conn canceler callback in
let conn =
{ conn ; point_info = pi ; gid_info = gi ;
{ conn ; point_info = pi ; peer_info = gi ;
messages ; canceler ; answerer ; wait_close = false } in
iter_option pi ~f:begin fun pi ->
let point = Point_info.point pi in
Point_info.State.set_running pi gid conn ;
Point_info.State.set_running pi peer_id conn ;
Point.Table.add pool.connected_points point pi ;
end ;
LogEvent.connection_established pool.watcher ~id_point ~gid ;
Gid_info.State.set_running gi id_point conn ;
Gid.Table.add pool.connected_gids gid gi ;
LogEvent.connection_established pool.watcher ~id_point ~peer_id ;
Peer_info.State.set_running gi id_point conn ;
Peer_id.Table.add pool.connected_peer_ids peer_id gi ;
Lwt_condition.broadcast () ;
Canceler.on_cancel canceler begin fun () ->
lwt_debug "Disconnect: %a (%a)"
Gid.pp gid Id_point.pp id_point >>= fun () ->
Peer_id.pp peer_id Id_point.pp id_point >>= fun () ->
iter_option ~f:Point_info.State.set_disconnected pi;
LogEvent.disconnection pool.watcher ~is_external:false ~gid ;
Gid_info.State.set_disconnected gi ;
LogEvent.disconnection pool.watcher ~is_external:false ~peer_id ;
Peer_info.State.set_disconnected gi ;
iter_option pi ~f:begin fun pi ->
Point.Table.remove pool.connected_points (Point_info.point pi) ;
end ;
Gid.Table.remove pool.connected_gids gid ;
Peer_id.Table.remove pool.connected_peer_ids peer_id ;
if pool.config.max_connections <= active_connections pool then begin
Lwt_condition.broadcast () ;
LogEvent.too_many_connections pool.watcher ;
@ -501,9 +501,9 @@ let disconnect ?(wait = false) conn =
Canceler.cancel conn.canceler >>= fun () ->
type error += Rejected of Gid.t
type error += Rejected of Peer_id.t
type error += Unexpected_point_state
type error += Unexpected_gid_state
type error += Unexpected_peer_id_state
let may_register_my_id_point pool = function
| [P2p_connection.Myself (addr, Some port)] ->
@ -551,7 +551,7 @@ let authenticate pool ?pi canceler fd point =
match pi, remote_pi with
| None, None -> None
| Some _ as pi, _ | _, (Some _ as pi) -> pi in
let gi = register_peer pool info.gid in
let gi = register_peer pool info.peer_id in
let acceptable_versions =
Version.common info.versions pool.message_config.versions
@ -567,8 +567,8 @@ let authenticate pool ?pi canceler fd point =
| Accepted _ | Running _ -> false
let acceptable_gid =
match Gid_info.State.get gi with
let acceptable_peer_id =
match Peer_info.State.get gi with
| Accepted _ ->
(* TODO: in some circumstances cancel and accept... *)
@ -578,12 +578,12 @@ let authenticate pool ?pi canceler fd point =
if incoming then
Point.Table.remove pool.incoming point ;
match acceptable_versions with
| Some version when acceptable_gid && acceptable_point -> begin
| Some version when acceptable_peer_id && acceptable_point -> begin
LogEvent.accepting_request pool.watcher
~id_point:info.id_point ~point ~gid:info.gid ;
~id_point:info.id_point ~point ~peer_id:info.peer_id ;
iter_option connection_pi
~f:(fun pi -> Point_info.State.set_accepted pi info.gid canceler) ;
Gid_info.State.set_accepted gi info.id_point canceler ;
~f:(fun pi -> Point_info.State.set_accepted pi info.peer_id canceler) ;
Peer_info.State.set_accepted gi info.id_point canceler ;
lwt_debug "authenticate: %a -> accept %a"
Point.pp point
Connection_info.pp info >>= fun () ->
@ -599,12 +599,12 @@ let authenticate pool ?pi canceler fd point =
end ~on_error: begin fun err ->
if incoming then
LogEvent.request_rejected pool.watcher
~credentials:(info.id_point, info.gid) ~point ;
~credentials:(info.id_point, info.peer_id) ~point ;
lwt_debug "authenticate: %a -> rejected %a"
Point.pp point
Connection_info.pp info >>= fun () ->
iter_option connection_pi ~f:Point_info.State.set_disconnected;
Gid_info.State.set_disconnected gi ;
Peer_info.State.set_disconnected gi ;
Lwt.return (Error err)
end >>=? fun conn ->
let id_point =
@ -615,17 +615,17 @@ let authenticate pool ?pi canceler fd point =
| _ -> begin
LogEvent.rejecting_request pool.watcher
~id_point:info.id_point ~point ~gid:info.gid ;
lwt_debug "authenticate: %a -> kick %a point: %B gid: %B"
~id_point:info.id_point ~point ~peer_id:info.peer_id ;
lwt_debug "authenticate: %a -> kick %a point: %B peer_id: %B"
Point.pp point
Connection_info.pp info
acceptable_point acceptable_gid >>= fun () ->
acceptable_point acceptable_peer_id >>= fun () ->
P2p_connection.kick auth_fd >>= fun () ->
if not incoming then begin
iter_option ~f:Point_info.State.set_disconnected pi ;
(* FIXME Gid_info.State.set_disconnected ~requested:true gi ; *)
(* FIXME Peer_info.State.set_disconnected ~requested:true gi ; *)
end ;
fail (Rejected info.gid)
fail (Rejected info.peer_id)
type error += Pending_connection
@ -640,8 +640,8 @@ let fail_unless_disconnected_point pi =
| Requested _ | Accepted _ -> fail Pending_connection
| Running _ -> fail Connected
let fail_unless_disconnected_gid gi =
match Gid_info.State.get gi with
let fail_unless_disconnected_peer_id gi =
match Peer_info.State.get gi with
| Disconnected -> return ()
| Accepted _ -> fail Pending_connection
| Running _ -> fail Connected
@ -725,76 +725,76 @@ let write_now { conn } msg =
P2p_connection.write_now conn (Message msg)
let write_all pool msg =
(fun _gid gi ->
match Gid_info.State.get gi with
(fun _peer_id gi ->
match Peer_info.State.get gi with
| Running { data = conn } ->
ignore (write_now conn msg : bool tzresult )
| _ -> ())
let broadcast_bootstrap_msg pool =
(fun _gid gi ->
match Gid_info.State.get gi with
(fun _peer_id gi ->
match Peer_info.State.get gi with
| Running { data = { conn } } ->
ignore (P2p_connection.write_now conn Bootstrap : bool tzresult )
| _ -> ())
module Gids = struct
module Peer_ids = struct
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Gid_info.t
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Peer_info.t
let info { known_gids } point =
try Some (Gid.Table.find known_gids point)
let info { known_peer_ids } point =
try Some (Peer_id.Table.find known_peer_ids point)
with Not_found -> None
let get_metadata pool gid =
try Some (Gid_info.metadata (Gid.Table.find pool.known_gids gid))
let get_metadata pool peer_id =
try Some (Peer_info.metadata (Peer_id.Table.find pool.known_peer_ids peer_id))
with Not_found -> None
let get_score pool gid =
try Some (pool.meta_config.score @@ Gid_info.metadata (Gid.Table.find pool.known_gids gid))
let get_score pool peer_id =
try Some (pool.meta_config.score @@ Peer_info.metadata (Peer_id.Table.find pool.known_peer_ids peer_id))
with Not_found -> None
let set_metadata pool gid data =
Gid_info.set_metadata (register_peer pool gid) data
let set_metadata pool peer_id data =
Peer_info.set_metadata (register_peer pool peer_id) data
let get_trusted pool gid =
try Gid_info.trusted (Gid.Table.find pool.known_gids gid)
let get_trusted pool peer_id =
try Peer_info.trusted (Peer_id.Table.find pool.known_peer_ids peer_id)
with Not_found -> false
let set_trusted pool gid =
try Gid_info.set_trusted (register_peer pool gid)
let set_trusted pool peer_id =
try Peer_info.set_trusted (register_peer pool peer_id)
with Not_found -> ()
let unset_trusted pool gid =
try Gid_info.unset_trusted (Gid.Table.find pool.known_gids gid)
let unset_trusted pool peer_id =
try Peer_info.unset_trusted (Peer_id.Table.find pool.known_peer_ids peer_id)
with Not_found -> ()
let find_connection pool gid =
let find_connection pool peer_id =
(info pool gid)
(info pool peer_id)
~f:(fun p ->
match Gid_info.State.get p with
match Peer_info.State.get p with
| Running { data } -> Some data
| _ -> None)
let fold_known pool ~init ~f =
Gid.Table.fold f pool.known_gids init
Peer_id.Table.fold f pool.known_peer_ids init
let fold_connected pool ~init ~f =
Gid.Table.fold f pool.connected_gids init
Peer_id.Table.fold f pool.connected_peer_ids init
let fold_connections pool ~init ~f =
Gids.fold_connected pool ~init ~f:begin fun gid gi acc ->
match Gid_info.State.get gi with
| Running { data } -> f gid data acc
Peer_ids.fold_connected pool ~init ~f:begin fun peer_id gi acc ->
match Peer_info.State.get gi with
| Running { data } -> f peer_id data acc
| _ -> acc
@ -806,16 +806,16 @@ module Points = struct
try Some (Point.Table.find known_points point)
with Not_found -> None
let get_trusted pool gid =
try Point_info.trusted (Point.Table.find pool.known_points gid)
let get_trusted pool peer_id =
try Point_info.trusted (Point.Table.find pool.known_points peer_id)
with Not_found -> false
let set_trusted pool gid =
try Point_info.set_trusted (register_point pool gid)
let set_trusted pool peer_id =
try Point_info.set_trusted (register_point pool peer_id)
with Not_found -> ()
let unset_trusted pool gid =
try Point_info.unset_trusted (Point.Table.find pool.known_points gid)
let unset_trusted pool peer_id =
try Point_info.unset_trusted (Point.Table.find pool.known_points peer_id)
with Not_found -> ()
let find_connection pool point =
@ -857,8 +857,8 @@ let create config meta_config message_config io_sched =
let pool = {
config ; meta_config ; message_config ;
my_id_points = Point.Table.create 7 ;
known_gids = Gid.Table.create 53 ;
connected_gids = Gid.Table.create 53 ;
known_peer_ids = Peer_id.Table.create 53 ;
connected_peer_ids = Peer_id.Table.create 53 ;
known_points = Point.Table.create 53 ;
connected_points = Point.Table.create 53 ;
incoming = Point.Table.create 53 ;
@ -868,11 +868,11 @@ let create config meta_config message_config io_sched =
watcher = Watcher.create_input () ;
} in
List.iter (Points.set_trusted pool) config.trusted_points ;
Gid_info.File.load config.peers_file meta_config.encoding >>= function
| Ok gids ->
Peer_info.File.load config.peers_file meta_config.encoding >>= function
| Ok peer_ids ->
(fun gi -> Gid.Table.add pool.known_gids (Gid_info.gid gi) gi)
gids ;
(fun gi -> Peer_id.Table.add pool.known_peer_ids (Peer_info.peer_id gi) gi)
peer_ids ;
Lwt.return pool
| Error err ->
log_error "@[Failed to parsed peers file:@ %a@]"
@ -888,14 +888,14 @@ let destroy pool =
disconnect conn >>= fun () -> acc
| Disconnected -> acc)
pool.known_points @@
Gid.Table.fold (fun _gid gi acc ->
match Gid_info.State.get gi with
Peer_id.Table.fold (fun _peer_id gi acc ->
match Peer_info.State.get gi with
| Accepted { cancel } ->
Canceler.cancel cancel >>= fun () -> acc
| Running { data = conn } ->
disconnect conn >>= fun () -> acc
| Disconnected -> acc)
pool.known_gids @@
pool.known_peer_ids @@
Point.Table.fold (fun _point canceler acc ->
Canceler.cancel canceler >>= fun () -> acc)
pool.incoming Lwt.return_unit
@ -54,7 +54,7 @@ type config = {
peers_file : string ;
(** The path to the JSON file where the metadata associated to
gids are loaded / stored. *)
peer_ids are loaded / stored. *)
closed_network : bool ;
(** If [true], the only accepted connections are from peers whose
@ -93,8 +93,8 @@ type config = {
(** Size of the outgoing message queue internal to a peer's Writer
(See [P2p_connection.accept]). *)
known_gids_history_size : int ;
(** Size of the known gids log buffer (default: 50) *)
known_peer_ids_history_size : int ;
(** Size of the known peer_ids log buffer (default: 50) *)
known_points_history_size : int ;
(** Size of the known points log buffer (default: 50) *)
@ -106,8 +106,8 @@ type config = {
disconnected points, older first, to try to reach the amount of
connections indicated by the second integer. *)
max_known_gids : (int * int) option ;
(** Like [max_known_points], but for known gids. *)
max_known_peer_ids : (int * int) option ;
(** Like [max_known_points], but for known peer_ids. *)
type 'meta meta_config = {
@ -174,7 +174,7 @@ module LogEvent : sig
| Too_many_connections
| New_point of Point.t
| New_peer of Gid.t
| New_peer of Peer_id.t
(** Connection-level events *)
@ -185,25 +185,25 @@ module LogEvent : sig
| Authentication_failed of Point.t
(** Remote point failed authentication *)
| Accepting_request of Point.t * Id_point.t * Gid.t
| Accepting_request of Point.t * Id_point.t * Peer_id.t
(** We accepted a connection after authentifying the remote peer. *)
| Rejecting_request of Point.t * Id_point.t * Gid.t
| Rejecting_request of Point.t * Id_point.t * Peer_id.t
(** We rejected a connection after authentifying the remote peer. *)
| Request_rejected of Point.t * (Id_point.t * Gid.t) option
| Request_rejected of Point.t * (Id_point.t * Peer_id.t) option
(** The remote peer rejected our connection. *)
| Connection_established of Id_point.t * Gid.t
| Connection_established of Id_point.t * Peer_id.t
(** We succesfully established a authentified connection. *)
| Disconnection of Gid.t
| Disconnection of Peer_id.t
(** We decided to close the connection. *)
| External_disconnection of Gid.t
| External_disconnection of Peer_id.t
(** The connection was closed for external reason. *)
| Gc_points
(** Garbage correction of known point table has been triggered. *)
| Gc_gids
(** Garbage correction of known gids table has been triggered. *)
| Gc_peer_ids
(** Garbage correction of known peer_ids table has been triggered. *)
val encoding : t Data_encoding.t
@ -224,7 +224,7 @@ type ('msg, 'meta) connection
type error += Pending_connection
type error += Connected
type error += Connection_refused
type error += Rejected of Gid.t
type error += Rejected of Peer_id.t
type error += Too_many_connections
type error += Closed_network
@ -254,7 +254,7 @@ val connection_stat: ('msg, 'meta) connection -> Stat.t
val fold_connections:
('msg, 'meta) pool ->
init:'a ->
f:(Gid.t -> ('msg, 'meta) connection -> 'a -> 'a) ->
f:(Peer_id.t -> ('msg, 'meta) connection -> 'a -> 'a) ->
(** {1 I/O on connections} *)
@ -291,36 +291,36 @@ val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit
(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap]
for all member connections to [pool] in [Running] state. *)
(** {1 Functions on [Gid]} *)
(** {1 Functions on [Peer_id]} *)
module Gids : sig
module Peer_ids : sig
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Gid_info.t
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Peer_info.t
val info:
('msg, 'meta) pool -> Gid.t -> ('msg, 'meta) info option
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) info option
val get_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta option
val set_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta -> unit
val get_score: ('msg, 'meta) pool -> Gid.t -> float option
val get_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta option
val set_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta -> unit
val get_score: ('msg, 'meta) pool -> Peer_id.t -> float option
val get_trusted: ('msg, 'meta) pool -> Gid.t -> bool
val set_trusted: ('msg, 'meta) pool -> Gid.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Gid.t -> unit
val get_trusted: ('msg, 'meta) pool -> Peer_id.t -> bool
val set_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit
val find_connection:
('msg, 'meta) pool -> Gid.t -> ('msg, 'meta) connection option
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) connection option
val fold_known:
('msg, 'meta) pool ->
init:'a ->
f:(Gid.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(Peer_id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
val fold_connected:
('msg, 'meta) pool ->
init:'a ->
f:(Gid.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(Peer_id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
@ -13,22 +13,22 @@ module Point_info = struct
type 'data state =
| Requested of { cancel: Canceler.t }
| Accepted of { current_gid: Gid.t ;
| Accepted of { current_peer_id: Peer_id.t ;
cancel: Canceler.t }
| Running of { data: 'data ;
current_gid: Gid.t }
current_peer_id: Peer_id.t }
| Disconnected
module Event = struct
type kind =
| Outgoing_request
| Accepting_request of Gid.t
| Rejecting_request of Gid.t
| Request_rejected of Gid.t option
| Connection_established of Gid.t
| Disconnection of Gid.t
| External_disconnection of Gid.t
| Accepting_request of Peer_id.t
| Rejecting_request of Peer_id.t
| Request_rejected of Peer_id.t option
| Connection_established of Peer_id.t
| Disconnection of Peer_id.t
| External_disconnection of Peer_id.t
let kind_encoding =
let open Data_encoding in
@ -41,29 +41,29 @@ module Point_info = struct
(function Outgoing_request -> Some () | _ -> None)
(fun () -> Outgoing_request) ;
case ~tag:1 (branch_encoding "accepting_request"
(obj1 (req "gid" Gid.encoding)))
(function Accepting_request gid -> Some gid | _ -> None)
(fun gid -> Accepting_request gid) ;
(obj1 (req "peer_id" Peer_id.encoding)))
(function Accepting_request peer_id -> Some peer_id | _ -> None)
(fun peer_id -> Accepting_request peer_id) ;
case ~tag:2 (branch_encoding "rejecting_request"
(obj1 (req "gid" Gid.encoding)))
(function Rejecting_request gid -> Some gid | _ -> None)
(fun gid -> Rejecting_request gid) ;
(obj1 (req "peer_id" Peer_id.encoding)))
(function Rejecting_request peer_id -> Some peer_id | _ -> None)
(fun peer_id -> Rejecting_request peer_id) ;
case ~tag:3 (branch_encoding "request_rejected"
(obj1 (opt "gid" Gid.encoding)))
(function Request_rejected gid -> Some gid | _ -> None)
(fun gid -> Request_rejected gid) ;
(obj1 (opt "peer_id" Peer_id.encoding)))
(function Request_rejected peer_id -> Some peer_id | _ -> None)
(fun peer_id -> Request_rejected peer_id) ;
case ~tag:4 (branch_encoding "rejecting_request"
(obj1 (req "gid" Gid.encoding)))
(function Connection_established gid -> Some gid | _ -> None)
(fun gid -> Connection_established gid) ;
(obj1 (req "peer_id" Peer_id.encoding)))
(function Connection_established peer_id -> Some peer_id | _ -> None)
(fun peer_id -> Connection_established peer_id) ;
case ~tag:5 (branch_encoding "rejecting_request"
(obj1 (req "gid" Gid.encoding)))
(function Disconnection gid -> Some gid | _ -> None)
(fun gid -> Disconnection gid) ;
(obj1 (req "peer_id" Peer_id.encoding)))
(function Disconnection peer_id -> Some peer_id | _ -> None)
(fun peer_id -> Disconnection peer_id) ;
case ~tag:6 (branch_encoding "rejecting_request"
(obj1 (req "gid" Gid.encoding)))
(function External_disconnection gid -> Some gid | _ -> None)
(fun gid -> External_disconnection gid) ;
(obj1 (req "peer_id" Peer_id.encoding)))
(function External_disconnection peer_id -> Some peer_id | _ -> None)
(fun peer_id -> External_disconnection peer_id) ;
type t = {
@ -92,9 +92,9 @@ module Point_info = struct
mutable trusted : bool ;
mutable state : 'data state ;
mutable last_failed_connection : Time.t option ;
mutable last_rejected_connection : (Gid.t * Time.t) option ;
mutable last_established_connection : (Gid.t * Time.t) option ;
mutable last_disconnection : (Gid.t * Time.t) option ;
mutable last_rejected_connection : (Peer_id.t * Time.t) option ;
mutable last_established_connection : (Peer_id.t * Time.t) option ;
mutable last_disconnection : (Peer_id.t * Time.t) option ;
greylisting : greylisting_config ;
mutable greylisting_delay : float ;
mutable greylisting_end : Time.t ;
@ -172,27 +172,27 @@ module Point_info = struct
Ring.add events event ;
Watcher.notify watchers event
let log_incoming_rejection ?timestamp point_info gid =
log point_info ?timestamp (Rejecting_request gid)
let log_incoming_rejection ?timestamp point_info peer_id =
log point_info ?timestamp (Rejecting_request peer_id)
module State = struct
type 'data t = 'data state =
| Requested of { cancel: Canceler.t }
| Accepted of { current_gid: Gid.t ;
| Accepted of { current_peer_id: Peer_id.t ;
cancel: Canceler.t }
| Running of { data: 'data ;
current_gid: Gid.t }
current_peer_id: Peer_id.t }
| Disconnected
type 'data state = 'data t
let pp ppf = function
| Requested _ ->
Format.fprintf ppf "requested"
| Accepted { current_gid } ->
Format.fprintf ppf "accepted %a" Gid.pp current_gid
| Running { current_gid } ->
Format.fprintf ppf "running %a" Gid.pp current_gid
| Accepted { current_peer_id } ->
Format.fprintf ppf "accepted %a" Peer_id.pp current_peer_id
| Running { current_peer_id } ->
Format.fprintf ppf "running %a" Peer_id.pp current_peer_id
| Disconnected ->
Format.fprintf ppf "disconnected"
@ -215,29 +215,29 @@ module Point_info = struct
let set_accepted
?(timestamp = ())
point_info current_gid cancel =
point_info current_peer_id cancel =
(* log_notice "SET_ACCEPTED %a@." Point.pp point_info.point ; *)
assert begin
match point_info.state with
| Accepted _ | Running _ -> false
| Requested _ | Disconnected -> true
end ;
point_info.state <- Accepted { current_gid ; cancel } ;
log point_info ~timestamp (Accepting_request current_gid)
point_info.state <- Accepted { current_peer_id ; cancel } ;
log point_info ~timestamp (Accepting_request current_peer_id)
let set_running
?(timestamp = ())
point_info gid data =
point_info peer_id data =
assert begin
match point_info.state with
| Disconnected -> true (* request to unknown gid. *)
| Disconnected -> true (* request to unknown peer_id. *)
| Running _ -> false
| Accepted { current_gid } -> Gid.equal gid current_gid
| Accepted { current_peer_id } -> Peer_id.equal peer_id current_peer_id
| Requested _ -> true
end ;
point_info.state <- Running { data ; current_gid = gid } ;
point_info.last_established_connection <- Some (gid, timestamp) ;
log point_info ~timestamp (Connection_established gid)
point_info.state <- Running { data ; current_peer_id = peer_id } ;
point_info.last_established_connection <- Some (peer_id, timestamp) ;
log point_info ~timestamp (Connection_established peer_id)
let set_greylisted timestamp point_info =
point_info.greylisting_end <-
@ -255,21 +255,21 @@ module Point_info = struct
set_greylisted timestamp point_info ;
point_info.last_failed_connection <- Some timestamp ;
Request_rejected None
| Accepted { current_gid } ->
| Accepted { current_peer_id } ->
set_greylisted timestamp point_info ;
point_info.last_rejected_connection <-
Some (current_gid, timestamp) ;
Request_rejected (Some current_gid)
| Running { current_gid } ->
Some (current_peer_id, timestamp) ;
Request_rejected (Some current_peer_id)
| Running { current_peer_id } ->
point_info.greylisting_delay <-
float_of_int point_info.greylisting.initial_delay ;
point_info.greylisting_end <-
Time.add timestamp
(Int64.of_int point_info.greylisting.disconnection_delay) ;
point_info.last_disconnection <- Some (current_gid, timestamp) ;
point_info.last_disconnection <- Some (current_peer_id, timestamp) ;
if requested
then Disconnection current_gid
else External_disconnection current_gid
then Disconnection current_peer_id
else External_disconnection current_peer_id
| Disconnected ->
assert false
@ -280,7 +280,7 @@ module Point_info = struct
module Gid_info = struct
module Peer_info = struct
type 'data state =
| Accepted of { current_point: Id_point.t ;
@ -333,7 +333,7 @@ module Gid_info = struct
type ('conn, 'meta) t = {
gid : Gid.t ;
peer_id : Peer_id.t ;
created : Time.t ;
mutable state : 'conn state ;
mutable metadata : 'meta ;
@ -345,14 +345,14 @@ module Gid_info = struct
events : Event.t Ring.t ;
watchers : Event.t Watcher.input ;
type ('conn, 'meta) gid_info = ('conn, 'meta) t
type ('conn, 'meta) peer_info = ('conn, 'meta) t
let compare gi1 gi2 = gi1.gid gi2.gid
let compare gi1 gi2 = gi1.peer_id gi2.peer_id
let log_size = 100
let create ?(created = ()) ?(trusted = false) ~metadata gid =
{ gid ;
let create ?(created = ()) ?(trusted = false) ~metadata peer_id =
{ peer_id ;
created ;
state = Disconnected ;
metadata ;
@ -368,20 +368,20 @@ module Gid_info = struct
let encoding metadata_encoding =
let open Data_encoding in
(fun { gid ; trusted ; metadata ; events ; created ;
(fun { peer_id ; trusted ; metadata ; events ; created ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection } ->
(gid, created, trusted, metadata, Ring.elements events,
(peer_id, created, trusted, metadata, Ring.elements events,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection))
(fun (gid, created, trusted, metadata, event_list,
(fun (peer_id, created, trusted, metadata, event_list,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection) ->
let info = create ~trusted ~metadata gid in
let info = create ~trusted ~metadata peer_id in
let events = Ring.create log_size in
Ring.add_list event_list ;
{ state = Disconnected ;
trusted ; gid ; metadata ; created ;
trusted ; peer_id ; metadata ; created ;
last_failed_connection ;
last_rejected_connection ;
last_established_connection ;
@ -390,7 +390,7 @@ module Gid_info = struct
watchers = Watcher.create_input () ;
(req "gid" Gid.encoding)
(req "peer_id" Peer_id.encoding)
(req "created" Time.encoding)
(dft "trusted" bool false)
(req "metadata" metadata_encoding)
@ -404,7 +404,7 @@ module Gid_info = struct
(opt "last_disconnection"
(tup2 Id_point.encoding Time.encoding)))
let gid { gid } = gid
let peer_id { peer_id } = peer_id
let created { created } = created
let metadata { metadata } = metadata
let set_metadata gi metadata = gi.metadata <- metadata
@ -435,8 +435,8 @@ module Gid_info = struct
let watch { watchers } = Watcher.create_stream watchers
let log_incoming_rejection ?timestamp gid_info point =
log gid_info ?timestamp point Rejecting_request
let log_incoming_rejection ?timestamp peer_info point =
log peer_info ?timestamp point Rejecting_request
module State = struct
@ -465,46 +465,46 @@ module Gid_info = struct
let set_accepted
?(timestamp = ())
gid_info current_point cancel =
peer_info current_point cancel =
assert begin
match gid_info.state with
match peer_info.state with
| Accepted _ | Running _ -> false
| Disconnected -> true
end ;
gid_info.state <- Accepted { current_point ; cancel } ;
log gid_info ~timestamp current_point Accepting_request
peer_info.state <- Accepted { current_point ; cancel } ;
log peer_info ~timestamp current_point Accepting_request
let set_running
?(timestamp = ())
gid_info point data =
peer_info point data =
assert begin
match gid_info.state with
| Disconnected -> true (* request to unknown gid. *)
match peer_info.state with
| Disconnected -> true (* request to unknown peer_id. *)
| Running _ -> false
| Accepted { current_point } ->
Id_point.equal point current_point
end ;
gid_info.state <- Running { data ; current_point = point } ;
gid_info.last_established_connection <- Some (point, timestamp) ;
log gid_info ~timestamp point Connection_established
peer_info.state <- Running { data ; current_point = point } ;
peer_info.last_established_connection <- Some (point, timestamp) ;
log peer_info ~timestamp point Connection_established
let set_disconnected
?(timestamp = ()) ?(requested = false) gid_info =
?(timestamp = ()) ?(requested = false) peer_info =
let current_point, (event : Event.kind) =
match gid_info.state with
match peer_info.state with
| Accepted { current_point } ->
gid_info.last_rejected_connection <-
peer_info.last_rejected_connection <-
Some (current_point, timestamp) ;
current_point, Request_rejected
| Running { current_point } ->
gid_info.last_disconnection <-
peer_info.last_disconnection <-
Some (current_point, timestamp) ;
if requested then Disconnection else External_disconnection
| Disconnected -> assert false
gid_info.state <- Disconnected ;
log gid_info ~timestamp current_point event
peer_info.state <- Disconnected ;
log peer_info ~timestamp current_point event
@ -41,14 +41,14 @@ module Point_info : sig
val last_failed_connection :
'conn point_info -> Time.t option
val last_rejected_connection :
'conn point_info -> (Gid.t * Time.t) option
'conn point_info -> (Peer_id.t * Time.t) option
val last_established_connection :
'conn point_info -> (Gid.t * Time.t) option
'conn point_info -> (Peer_id.t * Time.t) option
val last_disconnection :
'conn point_info -> (Gid.t * Time.t) option
'conn point_info -> (Peer_id.t * Time.t) option
val last_seen :
'conn point_info -> (Gid.t * Time.t) option
'conn point_info -> (Peer_id.t * Time.t) option
(** [last_seen pi] is the most recent of:
* last established connection
@ -77,11 +77,11 @@ module Point_info : sig
type 'conn t =
| Requested of { cancel: Canceler.t }
(** We initiated a connection. *)
| Accepted of { current_gid: Gid.t ;
| Accepted of { current_peer_id: Peer_id.t ;
cancel: Canceler.t }
(** We accepted a incoming connection. *)
| Running of { data: 'conn ;
current_gid: Gid.t }
current_peer_id: Peer_id.t }
(** Successfully authentificated connection, normal business. *)
| Disconnected
(** No connection established currently. *)
@ -99,10 +99,10 @@ module Point_info : sig
val set_accepted :
?timestamp:Time.t ->
'conn point_info -> Gid.t -> Canceler.t -> unit
'conn point_info -> Peer_id.t -> Canceler.t -> unit
val set_running :
?timestamp:Time.t -> 'conn point_info -> Gid.t -> 'conn -> unit
?timestamp:Time.t -> 'conn point_info -> Peer_id.t -> 'conn -> unit
val set_disconnected :
?timestamp:Time.t -> ?requested:bool -> 'conn point_info -> unit
@ -114,17 +114,17 @@ module Point_info : sig
type kind =
| Outgoing_request
(** We initiated a connection. *)
| Accepting_request of Gid.t
| Accepting_request of Peer_id.t
(** We accepted a connection after authentifying the remote peer. *)
| Rejecting_request of Gid.t
| Rejecting_request of Peer_id.t
(** We rejected a connection after authentifying the remote peer. *)
| Request_rejected of Gid.t option
| Request_rejected of Peer_id.t option
(** The remote peer rejected our connection. *)
| Connection_established of Gid.t
| Connection_established of Peer_id.t
(** We succesfully established a authentified connection. *)
| Disconnection of Gid.t
| Disconnection of Peer_id.t
(** We decided to close the connection. *)
| External_disconnection of Gid.t
| External_disconnection of Peer_id.t
(** The connection was closed for external reason. *)
type t = {
@ -142,17 +142,17 @@ module Point_info : sig
'conn point_info -> Event.t Lwt_stream.t * Watcher.stopper
val log_incoming_rejection :
?timestamp:Time.t -> 'conn point_info -> Gid.t -> unit
?timestamp:Time.t -> 'conn point_info -> Peer_id.t -> unit
(** Gid info: current and historical information about a gid *)
(** Peer_id info: current and historical information about a peer_id *)
module Gid_info : sig
module Peer_info : sig
type ('conn, 'meta) t
type ('conn, 'meta) gid_info = ('conn, 'meta) t
type ('conn, 'meta) peer_info = ('conn, 'meta) t
val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int
@ -160,31 +160,31 @@ module Gid_info : sig
?created:Time.t ->
?trusted:bool ->
metadata:'meta ->
Gid.t -> ('conn, 'meta) gid_info
(** [create ~trusted ~meta gid] is a freshly minted gid info for
[gid]. *)
Peer_id.t -> ('conn, 'meta) peer_info
(** [create ~trusted ~meta peer_id] is a freshly minted peer_id info for
[peer_id]. *)
val gid : ('conn, 'meta) gid_info -> Gid.t
val peer_id : ('conn, 'meta) peer_info -> Peer_id.t
val created : ('conn, 'meta) gid_info -> Time.t
val metadata : ('conn, 'meta) gid_info -> 'meta
val set_metadata : ('conn, 'meta) gid_info -> 'meta -> unit
val created : ('conn, 'meta) peer_info -> Time.t
val metadata : ('conn, 'meta) peer_info -> 'meta
val set_metadata : ('conn, 'meta) peer_info -> 'meta -> unit
val trusted : ('conn, 'meta) gid_info -> bool
val set_trusted : ('conn, 'meta) gid_info -> unit
val unset_trusted : ('conn, 'meta) gid_info -> unit
val trusted : ('conn, 'meta) peer_info -> bool
val set_trusted : ('conn, 'meta) peer_info -> unit
val unset_trusted : ('conn, 'meta) peer_info -> unit
val last_failed_connection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
('conn, 'meta) peer_info -> (Id_point.t * Time.t) option
val last_rejected_connection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
('conn, 'meta) peer_info -> (Id_point.t * Time.t) option
val last_established_connection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
('conn, 'meta) peer_info -> (Id_point.t * Time.t) option
val last_disconnection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
('conn, 'meta) peer_info -> (Id_point.t * Time.t) option
val last_seen :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
('conn, 'meta) peer_info -> (Id_point.t * Time.t) option
(** [last_seen gi] is the most recent of:
* last established connection
@ -193,7 +193,7 @@ module Gid_info : sig
val last_miss :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
('conn, 'meta) peer_info -> (Id_point.t * Time.t) option
(** [last_miss gi] is the most recent of:
* last failed connection
@ -217,22 +217,22 @@ module Gid_info : sig
val pp : Format.formatter -> 'conn t -> unit
val get : ('conn, 'meta) gid_info -> 'conn state
val get : ('conn, 'meta) peer_info -> 'conn state
val is_disconnected : ('conn, 'meta) gid_info -> bool
val is_disconnected : ('conn, 'meta) peer_info -> bool
val set_accepted :
?timestamp:Time.t ->
('conn, 'meta) gid_info -> Id_point.t -> Canceler.t -> unit
('conn, 'meta) peer_info -> Id_point.t -> Canceler.t -> unit
val set_running :
?timestamp:Time.t ->
('conn, 'meta) gid_info -> Id_point.t -> 'conn -> unit
('conn, 'meta) peer_info -> Id_point.t -> 'conn -> unit
val set_disconnected :
?timestamp:Time.t ->
?requested:bool ->
('conn, 'meta) gid_info -> unit
('conn, 'meta) peer_info -> unit
@ -262,22 +262,22 @@ module Gid_info : sig
val fold_events :
('conn, 'meta) gid_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a
('conn, 'meta) peer_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a
val watch :
('conn, 'meta) gid_info -> Event.t Lwt_stream.t * Watcher.stopper
('conn, 'meta) peer_info -> Event.t Lwt_stream.t * Watcher.stopper
val log_incoming_rejection :
?timestamp:Time.t ->
('conn, 'meta) gid_info -> Id_point.t -> unit
('conn, 'meta) peer_info -> Id_point.t -> unit
module File : sig
val load :
string -> 'meta Data_encoding.t ->
('conn, 'meta) gid_info list tzresult Lwt.t
('conn, 'meta) peer_info list tzresult Lwt.t
val save :
string -> 'meta Data_encoding.t ->
('conn, 'meta) gid_info list -> unit tzresult Lwt.t
('conn, 'meta) peer_info list -> unit tzresult Lwt.t
@ -20,19 +20,19 @@ let inet_addr = Unix.inet_addr_of_string "ff0e::54:455a:3053"
module Message = struct
let encoding =
Data_encoding.(tup3 (Fixed.string 10) Gid.encoding int16)
Data_encoding.(tup3 (Fixed.string 10) Peer_id.encoding int16)
let length = Data_encoding.Binary.fixed_length_exn encoding
let make gid port =
Data_encoding.Binary.to_bytes encoding ("DISCOMAGIC", gid, port)
let make peer_id port =
Data_encoding.Binary.to_bytes encoding ("DISCOMAGIC", peer_id, port)
(* Sends discover messages into space in an exponentially delayed loop,
restartable using a condition *)
let sender sock saddr my_gid inco_port cancelation restart =
let buf = Message.make my_gid inco_port in
let sender sock saddr my_peer_id inco_port cancelation restart =
let buf = Message.make my_peer_id inco_port in
let rec loop delay n =
(fun () ->
@ -40,7 +40,7 @@ let sender sock saddr my_gid inco_port cancelation restart =
(fun exn ->
lwt_debug "(%a) error broadcasting a discovery request: %a"
Gid.pp my_gid Error_monad.pp (Exn exn)) >>= fun () ->
Peer_id.pp my_peer_id Error_monad.pp (Exn exn)) >>= fun () ->
[ (Lwt_unix.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ;
(cancelation () >>= fun () -> Lwt.return_none) ;
@ -66,7 +66,7 @@ module Answerer = struct
(* Launch an answer machine for the discovery mechanism, takes a
callback to fill the answers and returns a canceler function *)
let answerer sock my_gid cancelation callback =
let answerer sock my_peer_id cancelation callback =
(* the answering function *)
let buf = MBytes.create Message.length in
let rec step () =
@ -78,8 +78,8 @@ module Answerer = struct
| Some (len', Lwt_unix.ADDR_INET (remote_addr, _mcast_port))
when len' = Message.length -> begin
match (Data_encoding.Binary.of_bytes Message.encoding buf) with
| Some ("DISCOMAGIC", remote_gid, remote_inco_port)
when remote_gid <> my_gid ->
| Some ("DISCOMAGIC", remote_peer_id, remote_inco_port)
when remote_peer_id <> my_peer_id ->
(fun () -> callback ~remote_addr ~remote_inco_port)
(fun exn ->
@ -101,8 +101,8 @@ module Answerer = struct
(fun () ->
(Format.asprintf "(%a) discovery answerer" Gid.pp my_gid)
(fun () -> answerer fd my_gid cancelation callback)
(Format.asprintf "(%a) discovery answerer" Peer_id.pp my_peer_id)
(fun () -> answerer fd my_peer_id cancelation callback)
(fun exn ->
lwt_log_error "Discovery answerer not started: %a"
@ -117,9 +117,9 @@ let discovery_sender =
(fun () ->
let sender () =
Discovery.sender fd
saddr my_gid inco_port cancelation restart_discovery in
saddr my_peer_id inco_port cancelation restart_discovery in
(Format.asprintf "(%a) discovery sender" Gid.pp my_gid)
(Format.asprintf "(%a) discovery sender" Peer_id.pp my_peer_id)
sender cancel)
(fun exn ->
lwt_log_error "Discovery sender not started: %a"
@ -489,7 +489,7 @@ let shutdown ?timeout st =
st.closed <- true ;
ReadScheduler.shutdown st.read_scheduler >>= fun () ->
(fun _gid conn acc -> close ?timeout conn >>= fun _ -> acc)
(fun _peer_id conn acc -> close ?timeout conn >>= fun _ -> acc)
Lwt.return_unit >>= fun () ->
WriteScheduler.shutdown st.write_scheduler >>= fun () ->
@ -101,7 +101,7 @@ module Stat = struct
(req "current_outflow" int31))
module Gid = struct
module Peer_id = struct
include Crypto_box.Public_key_hash
let pp = pp_short
module Map = Map.Make (Crypto_box.Public_key_hash)
@ -162,10 +162,6 @@ module Point = struct
include T
(* Run-time point-or-gid indexed storage, one point is bound to at
most one gid, which is the invariant we want to keep both for the
connected peers table and the known peers one *)
module Map = Map.Make (T)
module Set = Set.Make (T)
module Table = Hashtbl.Make (T)
@ -220,10 +216,6 @@ module Id_point = struct
include T
(* Run-time point-or-gid indexed storage, one point is bound to at
most one gid, which is the invariant we want to keep both for the
connected peers table and the known peers one *)
module Map = Map.Make (T)
module Set = Set.Make (T)
module Table = Hashtbl.Make (T)
@ -233,7 +225,7 @@ end
module Identity = struct
type t = {
gid : Gid.t ;
peer_id : Peer_id.t ;
public_key : Crypto_box.public_key ;
secret_key : Crypto_box.secret_key ;
proof_of_work_stamp : Crypto_box.nonce ;
@ -245,18 +237,18 @@ module Identity = struct
(fun { public_key ; secret_key ; proof_of_work_stamp } ->
(public_key, secret_key, proof_of_work_stamp))
(fun (public_key, secret_key, proof_of_work_stamp) ->
let gid = Crypto_box.hash public_key in
{ gid ; public_key ; secret_key ; proof_of_work_stamp })
let peer_id = Crypto_box.hash public_key in
{ peer_id ; public_key ; secret_key ; proof_of_work_stamp })
(req "public_key" Crypto_box.public_key_encoding)
(req "secret_key" Crypto_box.secret_key_encoding)
(req "proof_of_work_stamp" Crypto_box.nonce_encoding))
let generate ?max target =
let secret_key, public_key, gid = Crypto_box.random_keypair () in
let secret_key, public_key, peer_id = Crypto_box.random_keypair () in
let proof_of_work_stamp =
Crypto_box.generate_proof_of_work ?max public_key target in
{ gid ; public_key ; secret_key ; proof_of_work_stamp }
{ peer_id ; public_key ; secret_key ; proof_of_work_stamp }
let animation = [|
"|.....|" ;
@ -307,7 +299,7 @@ module Connection_info = struct
type t = {
incoming : bool;
gid : Gid.t;
peer_id : Peer_id.t;
id_point : Id_point.t;
remote_socket_port : port;
versions : Version.t list ;
@ -316,26 +308,26 @@ module Connection_info = struct
let encoding =
let open Data_encoding in
(fun { incoming ; gid ; id_point ; remote_socket_port ; versions } ->
(incoming, gid, id_point, remote_socket_port, versions))
(fun (incoming, gid, id_point, remote_socket_port, versions) ->
{ incoming ; gid ; id_point ; remote_socket_port ; versions })
(fun { incoming ; peer_id ; id_point ; remote_socket_port ; versions } ->
(incoming, peer_id, id_point, remote_socket_port, versions))
(fun (incoming, peer_id, id_point, remote_socket_port, versions) ->
{ incoming ; peer_id ; id_point ; remote_socket_port ; versions })
(req "incoming" bool)
(req "gid" Gid.encoding)
(req "peer_id" Peer_id.encoding)
(req "id_point" Id_point.encoding)
(req "remote_socket_port" uint16)
(req "versions" (list Version.encoding)))
let pp ppf
{ incoming ; id_point = (remote_addr, remote_port) ; gid } =
{ incoming ; id_point = (remote_addr, remote_port) ; peer_id } =
Format.fprintf ppf "%a:%a {%a}%s"
Ipaddr.V6.pp_hum remote_addr
(fun ppf port ->
match port with
| None -> Format.pp_print_string ppf "??"
| Some port -> Format.pp_print_int ppf port) remote_port
Gid.pp gid
Peer_id.pp peer_id
(if incoming then " (incoming)" else "")
@ -24,11 +24,11 @@ module Version : sig
(** Gid, i.e. persistent peer identifier *)
(** Peer_id, i.e. persistent peer identifier *)
module Gid : sig
module Peer_id : sig
type t = Crypto_box.Public_key_hash.t
(** Type of a gid, a public key hash. *)
(** Type of a peer_id, a public key hash. *)
val compare : t -> t -> int
val equal : t -> t -> bool
@ -86,12 +86,12 @@ end
module Identity : sig
type t = {
gid : Gid.t ;
peer_id : Peer_id.t ;
public_key : Crypto_box.public_key ;
secret_key : Crypto_box.secret_key ;
proof_of_work_stamp : Crypto_box.nonce ;
(** Type of an identity, comprising a gid, a crypto keypair, and a
(** Type of an identity, comprising a peer_id, a crypto keypair, and a
proof of work stamp with enough difficulty so that the network
accept this identity as genuine. *)
@ -131,7 +131,7 @@ module Connection_info : sig
type t = {
incoming : bool;
gid : Gid.t;
peer_id : Peer_id.t;
id_point : Id_point.t;
remote_socket_port : port;
versions : Version.t list ;
@ -632,18 +632,18 @@ module RPC = struct
|||| node.p2p
module Gid = struct
module Peer_id = struct
let info (node : t) =
|||| node.p2p
|||| node.p2p
let infos (node : t) restrict =
Tezos_p2p.RPC.Gid.infos ~restrict node.p2p
Tezos_p2p.RPC.Peer_id.infos ~restrict node.p2p
let events (node : t) =
|||| node.p2p
|||| node.p2p
let watch (node : t) =
|||| node.p2p
|||| node.p2p
@ -86,19 +86,19 @@ module RPC : sig
val connect : t -> P2p.Point.t -> float -> unit tzresult Lwt.t
module Connection : sig
val info : t -> P2p.Gid.t -> P2p.Connection_info.t option
val kick : t -> P2p.Gid.t -> bool -> unit Lwt.t
val info : t -> P2p.Peer_id.t -> P2p.Connection_info.t option
val kick : t -> P2p.Peer_id.t -> bool -> unit Lwt.t
val list : t -> P2p.Connection_info.t list
val count : t -> int
module Gid : sig
module Peer_id : sig
val infos : t ->
P2p.RPC.Gid.state list -> (P2p.Gid.t * list
val info : t -> P2p.Gid.t -> option
val events : t -> P2p.Gid.t -> P2p.RPC.Gid.Event.t list
val watch : t -> P2p.Gid.t ->
P2p.RPC.Gid.Event.t Lwt_stream.t * Watcher.stopper
P2p.RPC.Peer_id.state list -> (P2p.Peer_id.t * list
val info : t -> P2p.Peer_id.t -> option
val events : t -> P2p.Peer_id.t -> P2p.RPC.Peer_id.Event.t list
val watch : t -> P2p.Peer_id.t ->
P2p.RPC.Peer_id.Event.t Lwt_stream.t * Watcher.stopper
module Point : sig
@ -464,32 +464,32 @@ let build_rpc_directory node =
(* Network : Connection *)
let dir =
let implementation gid () =
|||| node gid |> RPC.Answer.return in
let implementation peer_id () =
|||| node peer_id |> RPC.Answer.return in
RPC.register1 dir implementation in
let dir =
let implementation gid wait =
Node.RPC.Network.Connection.kick node gid wait >>= RPC.Answer.return in
let implementation peer_id wait =
Node.RPC.Network.Connection.kick node peer_id wait >>= RPC.Answer.return in
RPC.register1 dir Services.Network.Connection.kick implementation in
let dir =
let implementation () =
Node.RPC.Network.Connection.list node |> RPC.Answer.return in
RPC.register0 dir Services.Network.Connection.list implementation in
(* Network : Gid *)
(* Network : Peer_id *)
let dir =
let implementation state =
Node.RPC.Network.Gid.infos node state |> RPC.Answer.return in
RPC.register0 dir Services.Network.Gid.infos implementation in
Node.RPC.Network.Peer_id.infos node state |> RPC.Answer.return in
RPC.register0 dir Services.Network.Peer_id.infos implementation in
let dir =
let implementation gid () =
|||| node gid |> RPC.Answer.return in
RPC.register1 dir implementation in
let implementation peer_id () =
|||| node peer_id |> RPC.Answer.return in
RPC.register1 dir implementation in
let dir =
let implementation gid monitor =
let implementation peer_id monitor =
if monitor then
let stream, stopper = node gid in
let stream, stopper = node peer_id in
let shutdown () = Watcher.shutdown stopper in
let first_request = ref true in
let next () =
@ -497,12 +497,12 @@ let build_rpc_directory node =
Lwt_stream.get stream >|= map_option ~f:(fun i -> [i])
end else begin
first_request := false ;
Lwt.return_some @@ node gid
Lwt.return_some @@ node peer_id
end in
RPC.Answer.return_stream { next ; shutdown }
|||| node gid |> RPC.Answer.return in
RPC.register1 dir implementation in
|||| node peer_id |> RPC.Answer.return in
RPC.register1 dir implementation in
(* Network : Point *)
@ -486,9 +486,9 @@ end
module Network = struct
open P2p_types
let (gid_arg : P2p_types.Gid.t RPC.Arg.arg) =
let (peer_id_arg : P2p_types.Peer_id.t RPC.Arg.arg) =
~descr:"A network global identifier, also known as an identity."
~destruct:(fun s -> try
Ok (Crypto_box.Public_key_hash.of_b58check s)
@ -538,13 +538,13 @@ module Network = struct
~input: empty
~output: (option P2p.Connection_info.encoding)
RPC.Path.(root / "network" / "connection" /: gid_arg)
RPC.Path.(root / "network" / "connection" /: peer_id_arg)
let kick =
~input: (obj1 (req "wait" bool))
~output: empty
RPC.Path.(root / "network" / "connection" /: gid_arg / "kick")
RPC.Path.(root / "network" / "connection" /: peer_id_arg / "kick")
module Point = struct
@ -569,26 +569,26 @@ module Network = struct
RPC.Path.(root / "network" / "point" /: point_arg / "log")
module Gid = struct
module Peer_id = struct
let infos =
let filter =
obj1 (dft "filter" (list P2p.RPC.Gid.state_encoding) []) in
obj1 (dft "filter" (list P2p.RPC.Peer_id.state_encoding) []) in
~input: filter
~output: (list (tup2 P2p.Gid.encoding P2p.RPC.Gid.info_encoding))
RPC.Path.(root / "network" / "gid")
~output: (list (tup2 P2p.Peer_id.encoding P2p.RPC.Peer_id.info_encoding))
RPC.Path.(root / "network" / "peer_id")
let info =
~input: empty
~output: (option P2p.RPC.Gid.info_encoding)
RPC.Path.(root / "network" / "gid" /: gid_arg)
~output: (option P2p.RPC.Peer_id.info_encoding)
RPC.Path.(root / "network" / "peer_id" /: peer_id_arg)
let events =
~input: monitor_encoding
~output: (list P2p.RPC.Gid.Event.encoding)
RPC.Path.(root / "network" / "gid" /: gid_arg / "log")
~output: (list P2p.RPC.Peer_id.Event.encoding)
RPC.Path.(root / "network" / "peer_id" /: peer_id_arg / "log")
@ -133,9 +133,9 @@ module Network : sig
val list :
(unit, unit, unit, P2p.Connection_info.t list) RPC.service
val info :
(unit, unit * P2p.Gid.t, unit, P2p.Connection_info.t option) RPC.service
(unit, unit * P2p.Peer_id.t, unit, P2p.Connection_info.t option) RPC.service
val kick :
(unit, unit * P2p.Gid.t, bool, unit) RPC.service
(unit, unit * P2p.Peer_id.t, bool, unit) RPC.service
module Point : sig
@ -148,14 +148,14 @@ module Network : sig
(unit, unit * P2p.Point.t, bool, P2p.RPC.Point.Event.t list) RPC.service
module Gid : sig
module Peer_id : sig
val infos :
(unit, unit, P2p.RPC.Gid.state list,
(P2p.Gid.t * list) RPC.service
(unit, unit, P2p.RPC.Peer_id.state list,
(P2p.Peer_id.t * list) RPC.service
val info :
(unit, unit * P2p.Gid.t, unit, option) RPC.service
(unit, unit * P2p.Peer_id.t, unit, option) RPC.service
val events :
(unit, unit * P2p.Gid.t, bool, P2p.RPC.Gid.Event.t list) RPC.service
(unit, unit * P2p.Peer_id.t, bool, P2p.RPC.Peer_id.Event.t list) RPC.service
@ -163,13 +163,13 @@ module RPC = struct
let watch =
module Gid = struct
type info =
module Event = P2p_connection_pool_types.Gid_info.Event
module Peer_id = struct
type info =
module Event = P2p_connection_pool_types.Peer_info.Event
let info =
let events =
let infos = P2p.RPC.Gid.infos
let watch =
let info =
let events =
let infos = P2p.RPC.Peer_id.infos
let watch =
@ -25,8 +25,8 @@ type connection
(** Access the domain of active connections *)
val connections : net -> connection list
(** Return the active connection with identity [gid] *)
val find_connection : net -> Gid.t -> connection option
(** Return the active connection with identity [peer_id] *)
val find_connection : net -> Peer_id.t -> connection option
(** Access the info of an active connection. *)
val connection_info : net -> connection -> Connection_info.t
@ -35,8 +35,8 @@ val connection_info : net -> connection -> Connection_info.t
type metadata = unit
val get_metadata : net -> Gid.t -> metadata option
val set_metadata : net -> Gid.t -> metadata -> unit
val get_metadata : net -> Peer_id.t -> metadata option
val set_metadata : net -> Peer_id.t -> metadata -> unit
type net_id = Store.net_id
@ -91,8 +91,8 @@ module RPC : sig
val connect : net -> Point.t -> float -> unit tzresult Lwt.t
module Connection : sig
val info : net -> Gid.t -> Connection_info.t option
val kick : net -> Gid.t -> bool -> unit Lwt.t
val info : net -> Peer_id.t -> Connection_info.t option
val kick : net -> Peer_id.t -> bool -> unit Lwt.t
val list : net -> Connection_info.t list
val count : net -> int
@ -107,13 +107,13 @@ module RPC : sig
val watch : net -> Point.t -> Event.t Lwt_stream.t * Watcher.stopper
module Gid : sig
open P2p.RPC.Gid
module Peer_id : sig
open P2p.RPC.Peer_id
module Event = Event
val info : net -> Gid.t -> info option
val events : ?max:int -> ?rev:bool -> net -> Gid.t -> Event.t list
val infos : ?restrict:state list -> net -> (Gid.t * info) list
val watch : net -> Gid.t -> Event.t Lwt_stream.t * Watcher.stopper
val info : net -> Peer_id.t -> info option
val events : ?max:int -> ?rev:bool -> net -> Peer_id.t -> Event.t list
val infos : ?restrict:state list -> net -> (Peer_id.t * info) list
val watch : net -> Peer_id.t -> Event.t Lwt_stream.t * Watcher.stopper
@ -52,7 +52,7 @@ let default = {
time_between_slots =
(* One minute in seconds *)
60L ;
10L ;
first_free_mining_slot = 16l ;
max_signing_slot = 15 ;
instructions_per_transaction = 16 * 1024 ;
@ -74,7 +74,7 @@ let connect sched addr port id =
~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) ->
assert (not info.incoming) ;
assert ( info.gid id1.gid = 0) ;
assert ( info.peer_id id1.peer_id = 0) ;
return auth_fd
let simple_msg =
@ -108,7 +108,7 @@ let server main_socket =
accept sched main_socket >>=? fun (info, auth_fd) ->
lwt_log_notice "Kick" >>= fun () ->
assert (info.incoming) ;
assert ( info.gid id2.gid = 0) ;
assert ( info.peer_id id2.peer_id = 0) ;
P2p_connection.kick auth_fd >>= fun () ->
lwt_log_notice "Kick OK" >>= fun () ->
(* Let's be rejected. *)
@ -144,13 +144,13 @@ let make_net points repeat n =
incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
known_gids_history_size = 100 ;
known_peer_ids_history_size = 100 ;
known_points_history_size = 100 ;
max_known_points = None ;
max_known_gids = None ;
max_known_peer_ids = None ;
} in
~prefix:(Format.asprintf "%a " Gid.pp identity.gid)
~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id)
begin fun () ->
run_net config repeat points (fst point) (snd point) >>= function
| Ok () -> Lwt.return_unit
Reference in New Issue
Block a user