P2p: tag known peers as private until the first connection
This information will be needed to be able to know if a point is/was eventually private.
This commit is contained in:
parent
4a16b3ef0b
commit
bcb56331b5
@ -43,6 +43,7 @@ module Info = struct
|
||||
mutable last_failed_connection : Time.t option ;
|
||||
mutable last_rejected_connection : (P2p_peer.Id.t * Time.t) option ;
|
||||
mutable last_established_connection : (P2p_peer.Id.t * Time.t) option ;
|
||||
mutable known_public : bool ;
|
||||
mutable last_disconnection : (P2p_peer.Id.t * Time.t) option ;
|
||||
greylisting : greylisting_config ;
|
||||
mutable greylisting_delay : float ;
|
||||
@ -72,6 +73,7 @@ module Info = struct
|
||||
last_rejected_connection = None ;
|
||||
last_established_connection = None ;
|
||||
last_disconnection = None ;
|
||||
known_public = false ;
|
||||
events = Ring.create log_size ;
|
||||
greylisting = greylisting_config ;
|
||||
greylisting_delay = 1. ;
|
||||
@ -87,6 +89,7 @@ module Info = struct
|
||||
let last_disconnection s = s.last_disconnection
|
||||
let last_failed_connection s = s.last_failed_connection
|
||||
let last_rejected_connection s = s.last_rejected_connection
|
||||
let known_public s = s.known_public
|
||||
let greylisted ?(now = Time.now ()) s =
|
||||
Time.compare now s.greylisting_end <= 0
|
||||
let greylisted_until s = s.greylisting_end
|
||||
@ -151,7 +154,7 @@ let set_accepted
|
||||
|
||||
let set_running
|
||||
?(timestamp = Time.now ())
|
||||
point_info peer_id data =
|
||||
~known_private point_info peer_id data =
|
||||
assert begin
|
||||
match point_info.Info.state with
|
||||
| Disconnected -> true (* request to unknown peer_id. *)
|
||||
@ -160,6 +163,7 @@ let set_running
|
||||
| Requested _ -> true
|
||||
end ;
|
||||
point_info.state <- Running { data ; current_peer_id = peer_id } ;
|
||||
point_info.known_public <- not known_private ;
|
||||
point_info.last_established_connection <- Some (peer_id, timestamp) ;
|
||||
Info.log point_info ~timestamp (Connection_established peer_id)
|
||||
|
||||
|
@ -50,6 +50,8 @@ module Info : sig
|
||||
(** [trusted pi] is [true] iff [pi] has is trusted,
|
||||
i.e. "whitelisted". *)
|
||||
|
||||
val known_public : 'conn point_info -> bool
|
||||
|
||||
val set_trusted : 'conn point_info -> unit
|
||||
val unset_trusted : 'conn point_info -> unit
|
||||
|
||||
@ -110,7 +112,9 @@ val set_accepted :
|
||||
'conn Info.t -> P2p_peer.Id.t -> Lwt_canceler.t -> unit
|
||||
|
||||
val set_running :
|
||||
?timestamp:Time.t -> 'conn Info.t -> P2p_peer.Id.t -> 'conn -> unit
|
||||
?timestamp:Time.t ->
|
||||
known_private: bool ->
|
||||
'conn Info.t -> P2p_peer.Id.t -> 'conn -> unit
|
||||
|
||||
val set_disconnected :
|
||||
?timestamp:Time.t -> ?requested:bool -> 'conn Info.t -> unit
|
||||
|
@ -815,6 +815,7 @@ and authenticate pool ?point_info canceler fd point =
|
||||
?incoming_message_queue_size:pool.config.incoming_message_queue_size
|
||||
?outgoing_message_queue_size:pool.config.outgoing_message_queue_size
|
||||
?binary_chunks_size:pool.config.binary_chunks_size
|
||||
~private_node:pool.conn_meta_config.private_node
|
||||
auth_fd
|
||||
(pool.conn_meta_config.conn_meta_value info.peer_id)
|
||||
pool.encoding >>=? fun conn ->
|
||||
@ -927,7 +928,10 @@ and create_connection pool p2p_conn id_point point_info peer_info _version =
|
||||
ignore (Lazy.force answerer) ;
|
||||
Option.iter point_info ~f:begin fun point_info ->
|
||||
let point = P2p_point_state.Info.point point_info in
|
||||
P2p_point_state.set_running point_info peer_id conn ;
|
||||
let conn_meta = P2p_socket.meta p2p_conn in
|
||||
P2p_point_state.set_running
|
||||
~known_private:(pool.conn_meta_config.private_node conn_meta)
|
||||
point_info peer_id conn;
|
||||
P2p_point.Table.add pool.connected_points point point_info ;
|
||||
end ;
|
||||
log pool (Connection_established (id_point, peer_id)) ;
|
||||
@ -1102,7 +1106,7 @@ let create config peer_meta_config conn_meta_config message_config io_sched =
|
||||
new_connection = Lwt_condition.create () ;
|
||||
} in
|
||||
let pool = {
|
||||
config ; peer_meta_config ; conn_meta_config; message_config ;
|
||||
config ; peer_meta_config ; conn_meta_config ; message_config ;
|
||||
my_id_points = P2p_point.Table.create 7 ;
|
||||
known_peer_ids = P2p_peer.Table.create 53 ;
|
||||
connected_peer_ids = P2p_peer.Table.create 53 ;
|
||||
|
@ -33,9 +33,9 @@ type 'msg encoding = Encoding : {
|
||||
|
||||
(** {1 Pool management} *)
|
||||
|
||||
type ('msg, 'peer_meta,'conn_meta) t
|
||||
type ('msg, 'peer_meta, 'conn_meta) t
|
||||
|
||||
type ('msg, 'peer_meta,'conn_meta) pool = ('msg, 'peer_meta,'conn_meta) t
|
||||
type ('msg, 'peer_meta, 'conn_meta) pool = ('msg, 'peer_meta, 'conn_meta) t
|
||||
(** The type of a pool of connections, parametrized by resp. the type
|
||||
of messages and the meta-informations associated to an identity and
|
||||
a connection. *)
|
||||
|
@ -251,7 +251,8 @@ let authenticate
|
||||
let info =
|
||||
{ P2p_connection.Info.peer_id = remote_peer_id ;
|
||||
versions = msg.versions ; incoming ;
|
||||
id_point ; remote_socket_port ;} in
|
||||
id_point ; remote_socket_port ;
|
||||
} in
|
||||
return (info, { fd ; info ; cryptobox_data ; ack_encoding })
|
||||
|
||||
type connection = {
|
||||
@ -483,6 +484,7 @@ type ('msg, 'meta) t = {
|
||||
meta : 'meta ;
|
||||
reader : 'msg Reader.t ;
|
||||
writer : 'msg Writer.t ;
|
||||
private_node : bool ;
|
||||
}
|
||||
|
||||
let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
|
||||
@ -490,10 +492,12 @@ let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
|
||||
let pp ppf { conn } = P2p_connection.Info.pp ppf conn.info
|
||||
let info { conn } = conn.info
|
||||
let meta { meta } = meta
|
||||
let private_node { private_node } = private_node
|
||||
|
||||
let accept
|
||||
?incoming_message_queue_size ?outgoing_message_queue_size
|
||||
?binary_chunks_size
|
||||
~private_node
|
||||
{ fd ; info ; cryptobox_data ; ack_encoding }
|
||||
ack_param
|
||||
encoding =
|
||||
@ -517,7 +521,8 @@ let accept
|
||||
?size:outgoing_message_queue_size ?binary_chunks_size
|
||||
conn encoding canceler
|
||||
in
|
||||
let conn = { conn ; reader ; writer ; meta } in
|
||||
let conn = { conn ; reader ; writer ; meta ;
|
||||
private_node = private_node meta } in
|
||||
Lwt_canceler.on_cancel canceler begin fun () ->
|
||||
P2p_io_scheduler.close fd >>= fun _ ->
|
||||
Lwt.return_unit
|
||||
|
@ -33,6 +33,7 @@ val equal: ('mst, 'meta) t -> ('msg, 'meta) t -> bool
|
||||
val pp: Format.formatter -> ('msg, 'meta) t -> unit
|
||||
val info: ('msg, 'meta) t -> P2p_connection.Info.t
|
||||
val meta: ('msg, 'meta) t -> 'meta
|
||||
val private_node: ('msg, 'meta) t -> bool
|
||||
|
||||
(** {1 Low-level functions (do not use directly)} *)
|
||||
|
||||
@ -58,6 +59,7 @@ val accept:
|
||||
?incoming_message_queue_size:int ->
|
||||
?outgoing_message_queue_size:int ->
|
||||
?binary_chunks_size: int ->
|
||||
private_node:('conn_meta -> bool) ->
|
||||
'conn_meta authenticated_fd -> 'conn_meta ->
|
||||
'msg Data_encoding.t -> ('msg, 'conn_meta) t tzresult Lwt.t
|
||||
(** (Low-level) (Cancelable) Accepts a remote peer given an
|
||||
|
@ -172,7 +172,9 @@ module Kick = struct
|
||||
|
||||
let client _ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept auth_fd () encoding >>= fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>= fun conn ->
|
||||
_assert (is_rejected conn) __LOC__ "" >>=? fun () ->
|
||||
return ()
|
||||
|
||||
@ -186,7 +188,9 @@ module Kicked = struct
|
||||
|
||||
let server _ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept auth_fd () encoding >>= fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>= fun conn ->
|
||||
_assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () ->
|
||||
return ()
|
||||
|
||||
@ -208,7 +212,9 @@ module Simple_message = struct
|
||||
|
||||
let server ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.write_sync conn simple_msg >>=? fun () ->
|
||||
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
|
||||
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
|
||||
@ -218,7 +224,9 @@ module Simple_message = struct
|
||||
|
||||
let client ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
|
||||
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
|
||||
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
|
||||
@ -240,6 +248,7 @@ module Chunked_message = struct
|
||||
let server ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
~binary_chunks_size:21 auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.write_sync conn simple_msg >>=? fun () ->
|
||||
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
|
||||
@ -251,6 +260,7 @@ module Chunked_message = struct
|
||||
let client ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
~binary_chunks_size:21 auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
|
||||
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
|
||||
@ -272,7 +282,9 @@ module Oversized_message = struct
|
||||
|
||||
let server ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.write_sync conn simple_msg >>=? fun () ->
|
||||
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
|
||||
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
|
||||
@ -282,7 +294,9 @@ module Oversized_message = struct
|
||||
|
||||
let client ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
|
||||
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
|
||||
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
|
||||
@ -302,14 +316,18 @@ module Close_on_read = struct
|
||||
|
||||
let server ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
sync ch >>=? fun () ->
|
||||
P2p_socket.close conn >>= fun _stat ->
|
||||
return ()
|
||||
|
||||
let client ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
sync ch >>=? fun () ->
|
||||
P2p_socket.read conn >>= fun err ->
|
||||
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
|
||||
@ -328,14 +346,18 @@ module Close_on_write = struct
|
||||
|
||||
let server ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.close conn >>= fun _stat ->
|
||||
sync ch >>=? fun ()->
|
||||
return ()
|
||||
|
||||
let client ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
sync ch >>=? fun ()->
|
||||
Lwt_unix.sleep 0.1 >>= fun () ->
|
||||
P2p_socket.write_sync conn simple_msg >>= fun err ->
|
||||
@ -365,7 +387,9 @@ module Garbled_data = struct
|
||||
|
||||
let server _ch sched socket =
|
||||
accept sched socket >>=? fun (_info, auth_fd) ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.raw_write_sync conn garbled_msg >>=? fun () ->
|
||||
P2p_socket.read conn >>= fun err ->
|
||||
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
|
||||
@ -374,7 +398,9 @@ module Garbled_data = struct
|
||||
|
||||
let client _ch sched addr port =
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.accept
|
||||
~private_node:(fun _ -> false)
|
||||
auth_fd () encoding >>=? fun conn ->
|
||||
P2p_socket.read conn >>= fun err ->
|
||||
_assert (is_decoding_error err) __LOC__ "" >>=? fun () ->
|
||||
P2p_socket.close conn >>= fun _stat ->
|
||||
|
Loading…
Reference in New Issue
Block a user