Shell: implement P2p_connection
This commit is contained in:
parent
2ed8bf2cfa
commit
6d47cb2c8f
1
.gitignore
vendored
1
.gitignore
vendored
@ -40,6 +40,7 @@
|
||||
/test/test-basic
|
||||
/test/test-data-encoding
|
||||
/test/test-p2p-io-scheduler
|
||||
/test/test-p2p-connection
|
||||
/test/LOG
|
||||
|
||||
*~
|
||||
|
@ -259,6 +259,7 @@ NODE_LIB_INTFS := \
|
||||
\
|
||||
node/net/p2p_types.mli \
|
||||
node/net/p2p_io_scheduler.mli \
|
||||
node/net/p2p_connection.mli \
|
||||
node/net/p2p.mli \
|
||||
node/net/RPC_server.mli \
|
||||
\
|
||||
@ -291,6 +292,7 @@ NODE_LIB_IMPLS := \
|
||||
\
|
||||
node/net/p2p_types.ml \
|
||||
node/net/p2p_io_scheduler.ml \
|
||||
node/net/p2p_connection.ml \
|
||||
node/net/p2p.ml \
|
||||
\
|
||||
node/net/RPC_server.ml \
|
||||
|
410
src/node/net/p2p_connection.ml
Normal file
410
src/node/net/p2p_connection.ml
Normal file
@ -0,0 +1,410 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2016. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(* TODO encode/encrypt before to push into the writer pipe. *)
|
||||
(* TODO patch Sodium.Box to avoid allocation of the encrypted buffer.*)
|
||||
(* TODO patch Data_encoding for continuation-based binary writer/reader. *)
|
||||
(* TODO use queue bound by memory size of its elements, not by the
|
||||
number of elements. *)
|
||||
(* TODO test `close ~wait:true`. *)
|
||||
(* TODO nothing in welcoming message proves that the incoming peer is
|
||||
the owner of the public key... only the first message will
|
||||
really proves it. Should this to be changed? Not really
|
||||
important, but... an attacker might forge a random public key
|
||||
with enough proof of work (hard task), open a connection, wait
|
||||
infinitly. This would avoid the real peer to talk with us. And
|
||||
this might also have an influence on its "score". *)
|
||||
|
||||
open P2p_types
|
||||
|
||||
include Logging.Make(struct let name = "p2p.connection" end)
|
||||
|
||||
type error += Decipher_error
|
||||
type error += Invalid_message_size
|
||||
type error += Encoding_error
|
||||
type error += Rejected
|
||||
type error += Decoding_error
|
||||
type error += Myself of Id_point.t
|
||||
type error += Not_enough_proof_of_work of Gid.t
|
||||
|
||||
type cryptobox_data = {
|
||||
channel_key : Crypto_box.channel_key ;
|
||||
mutable local_nonce : Crypto_box.nonce ;
|
||||
mutable remote_nonce : Crypto_box.nonce ;
|
||||
}
|
||||
|
||||
let header_length = 2
|
||||
let crypto_overhead = 18 (* FIXME import from Sodium.Box. *)
|
||||
let max_content_length =
|
||||
1 lsl (header_length * 8) - crypto_overhead
|
||||
|
||||
module Connection_message = struct
|
||||
|
||||
type t = {
|
||||
port : int option ;
|
||||
versions : Version.t list ;
|
||||
public_key : Crypto_box.public_key ;
|
||||
proof_of_work_stamp : Crypto_box.nonce ;
|
||||
message_nonce : Crypto_box.nonce ;
|
||||
}
|
||||
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun { port ; public_key ; proof_of_work_stamp ;
|
||||
message_nonce ; versions } ->
|
||||
let port = match port with None -> 0 | Some port -> port in
|
||||
(port, public_key, proof_of_work_stamp,
|
||||
message_nonce, versions))
|
||||
(fun (port, public_key, proof_of_work_stamp,
|
||||
message_nonce, versions) ->
|
||||
let port = if port = 0 then None else Some port in
|
||||
{ port ; public_key ; proof_of_work_stamp ;
|
||||
message_nonce ; versions })
|
||||
(obj5
|
||||
(req "port" uint16)
|
||||
(req "pubkey" Crypto_box.public_key_encoding)
|
||||
(req "proof_of_work_stamp" Crypto_box.nonce_encoding)
|
||||
(req "message_nonce" Crypto_box.nonce_encoding)
|
||||
(req "versions" (Variable.list Version.encoding)))
|
||||
|
||||
let write fd message =
|
||||
let encoded_message_len =
|
||||
Data_encoding.Binary.length encoding message in
|
||||
fail_unless
|
||||
(encoded_message_len < max_content_length)
|
||||
Encoding_error >>=? fun () ->
|
||||
let len = header_length + encoded_message_len in
|
||||
let buf = MBytes.create len in
|
||||
match Data_encoding.Binary.write encoding message buf header_length with
|
||||
| None ->
|
||||
fail Encoding_error
|
||||
| Some last ->
|
||||
fail_unless (last = len) Encoding_error >>=? fun () ->
|
||||
MBytes.set_int16 buf 0 encoded_message_len ;
|
||||
P2p_io_scheduler.write fd buf
|
||||
|
||||
let read fd =
|
||||
let header_buf = MBytes.create header_length in
|
||||
P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () ->
|
||||
let len = MBytes.get_uint16 header_buf 0 in
|
||||
let buf = MBytes.create len in
|
||||
P2p_io_scheduler.read_full ~len fd buf >>=? fun () ->
|
||||
match Data_encoding.Binary.read encoding buf 0 len with
|
||||
| None ->
|
||||
fail Decoding_error
|
||||
| Some (read_len, message) ->
|
||||
if read_len <> len then
|
||||
fail Decoding_error
|
||||
else
|
||||
return message
|
||||
|
||||
end
|
||||
|
||||
module Ack = struct
|
||||
|
||||
type t = bool
|
||||
let ack = MBytes.of_string "\255"
|
||||
let nack = MBytes.of_string "\000"
|
||||
|
||||
let write fd b =
|
||||
match b with
|
||||
| true ->
|
||||
P2p_io_scheduler.write fd ack
|
||||
| false ->
|
||||
P2p_io_scheduler.write fd nack
|
||||
|
||||
let read fd =
|
||||
let buf = MBytes.create 1 in
|
||||
P2p_io_scheduler.read_full fd buf >>=? fun () ->
|
||||
return (buf <> nack)
|
||||
|
||||
end
|
||||
|
||||
type authenticated_fd =
|
||||
P2p_io_scheduler.connection * Connection_info.t * cryptobox_data
|
||||
|
||||
let kick (fd, _ , _) =
|
||||
Ack.write fd false >>= fun _ ->
|
||||
P2p_io_scheduler.close fd >>= fun _ ->
|
||||
Lwt.return_unit
|
||||
|
||||
(* First step: write and read credentials, makes no difference
|
||||
whether we're trying to connect to a peer or checking an incoming
|
||||
connection, both parties must first introduce themselves. *)
|
||||
let authenticate
|
||||
~proof_of_work_target
|
||||
~incoming fd (remote_addr, remote_socket_port as point)
|
||||
?listening_port identity supported_versions =
|
||||
let local_nonce = Crypto_box.random_nonce () in
|
||||
lwt_debug "Sending authenfication to %a" Point.pp point >>= fun () ->
|
||||
Connection_message.write fd
|
||||
{ public_key = identity.Identity.public_key ;
|
||||
proof_of_work_stamp = identity.proof_of_work_stamp ;
|
||||
message_nonce = local_nonce ;
|
||||
port = listening_port ;
|
||||
versions = supported_versions } >>=? fun () ->
|
||||
Connection_message.read fd >>=? fun msg ->
|
||||
let remote_listening_port =
|
||||
if incoming then msg.port else Some remote_socket_port in
|
||||
let id_point = remote_addr, remote_listening_port in
|
||||
let remote_gid = Crypto_box.hash msg.public_key in
|
||||
fail_unless
|
||||
(remote_gid <> identity.Identity.gid)
|
||||
(Myself id_point) >>=? fun () ->
|
||||
fail_unless
|
||||
(Crypto_box.check_proof_of_work
|
||||
msg.public_key msg.proof_of_work_stamp proof_of_work_target)
|
||||
(Not_enough_proof_of_work remote_gid) >>=? fun () ->
|
||||
let channel_key =
|
||||
Crypto_box.precompute identity.Identity.secret_key msg.public_key in
|
||||
let info =
|
||||
{ Connection_info.gid = remote_gid ; versions = msg.versions ; incoming ;
|
||||
id_point ; remote_socket_port ;} in
|
||||
let cryptobox_data =
|
||||
{ channel_key ; local_nonce ;
|
||||
remote_nonce = msg.message_nonce } in
|
||||
return (info, (fd, info, cryptobox_data))
|
||||
|
||||
type connection = {
|
||||
info : Connection_info.t ;
|
||||
fd : P2p_io_scheduler.connection ;
|
||||
cryptobox_data : cryptobox_data ;
|
||||
}
|
||||
|
||||
module Reader = struct
|
||||
|
||||
type 'msg t = {
|
||||
canceler: Canceler.t ;
|
||||
conn: connection ;
|
||||
encoding: 'msg Data_encoding.t ;
|
||||
messages: 'msg tzresult Lwt_pipe.t ;
|
||||
mutable worker: unit Lwt.t ;
|
||||
}
|
||||
|
||||
let read_chunk { fd ; cryptobox_data } =
|
||||
let header_buf = MBytes.create header_length in
|
||||
P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () ->
|
||||
let len = MBytes.get_uint16 header_buf 0 in
|
||||
let buf = MBytes.create len in
|
||||
P2p_io_scheduler.read_full ~len fd buf >>=? fun () ->
|
||||
let remote_nonce = cryptobox_data.remote_nonce in
|
||||
cryptobox_data.remote_nonce <- Crypto_box.increment_nonce remote_nonce ;
|
||||
match
|
||||
Crypto_box.fast_box_open cryptobox_data.channel_key buf remote_nonce
|
||||
with
|
||||
| None ->
|
||||
fail Decipher_error
|
||||
| Some buf ->
|
||||
return buf
|
||||
|
||||
let rec read_message st buf =
|
||||
return (Data_encoding.Binary.of_bytes st.encoding buf)
|
||||
|
||||
let rec worker_loop st =
|
||||
Lwt_unix.yield () >>= fun () ->
|
||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
||||
read_chunk st.conn >>=? fun buf ->
|
||||
read_message st buf
|
||||
end >>= function
|
||||
| Ok None ->
|
||||
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
|
||||
worker_loop st
|
||||
| Ok (Some msg) ->
|
||||
Lwt_pipe.push st.messages (Ok msg) >>= fun () ->
|
||||
worker_loop st
|
||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
||||
Lwt.return_unit
|
||||
| Error _ as err ->
|
||||
Lwt_pipe.push st.messages err >>= fun () ->
|
||||
Canceler.cancel st.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
|
||||
let run ?size conn encoding canceler =
|
||||
let st =
|
||||
{ canceler ; conn ; encoding ;
|
||||
messages = Lwt_pipe.create ?size () ;
|
||||
worker = Lwt.return_unit ;
|
||||
} in
|
||||
Canceler.on_cancel st.canceler begin fun () ->
|
||||
Lwt_pipe.close st.messages ;
|
||||
Lwt.return_unit
|
||||
end ;
|
||||
st.worker <-
|
||||
Lwt_utils.worker "reader"
|
||||
(fun () -> worker_loop st)
|
||||
(fun () -> Canceler.cancel st.canceler) ;
|
||||
st
|
||||
|
||||
let shutdown st =
|
||||
Canceler.cancel st.canceler >>= fun () ->
|
||||
st.worker
|
||||
|
||||
end
|
||||
|
||||
module Writer = struct
|
||||
|
||||
type 'msg t = {
|
||||
canceler: Canceler.t ;
|
||||
conn: connection ;
|
||||
encoding: 'msg Data_encoding.t ;
|
||||
messages: ('msg * unit tzresult Lwt.u option) Lwt_pipe.t ;
|
||||
mutable worker: unit Lwt.t ;
|
||||
}
|
||||
|
||||
let write_chunk { cryptobox_data ; fd } buf =
|
||||
let header_buf = MBytes.create header_length in
|
||||
let local_nonce = cryptobox_data.local_nonce in
|
||||
cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ;
|
||||
let encrypted_message =
|
||||
Crypto_box.fast_box cryptobox_data.channel_key buf local_nonce in
|
||||
let encrypted_len = MBytes.length encrypted_message in
|
||||
fail_unless
|
||||
(encrypted_len < max_content_length)
|
||||
Invalid_message_size >>=? fun () ->
|
||||
MBytes.set_int16 header_buf 0 encrypted_len ;
|
||||
P2p_io_scheduler.write fd header_buf >>=? fun () ->
|
||||
P2p_io_scheduler.write fd encrypted_message >>=? fun () ->
|
||||
return ()
|
||||
|
||||
let encode_message st msg =
|
||||
try return (Data_encoding.Binary.to_bytes st.encoding msg)
|
||||
with _ -> fail Encoding_error
|
||||
|
||||
let rec worker_loop st =
|
||||
Lwt_unix.yield () >>= fun () ->
|
||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
||||
Lwt_pipe.pop st.messages >>= fun (msg, wakener) ->
|
||||
encode_message st msg >>=? fun buf ->
|
||||
write_chunk st.conn buf >>= fun res ->
|
||||
iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
|
||||
Lwt.return res
|
||||
end >>= function
|
||||
| Ok () ->
|
||||
worker_loop st
|
||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
||||
Lwt.return_unit
|
||||
| Error err ->
|
||||
lwt_log_error
|
||||
"@[<v 2>Error while writing to %a@ %a@]"
|
||||
Connection_info.pp st.conn.info pp_print_error err >>= fun () ->
|
||||
Canceler.cancel st.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
|
||||
let run ?size conn encoding canceler =
|
||||
let st =
|
||||
{ canceler ; conn ; encoding ;
|
||||
messages = Lwt_pipe.create ?size () ;
|
||||
worker = Lwt.return_unit ;
|
||||
} in
|
||||
Canceler.on_cancel st.canceler begin fun () ->
|
||||
Lwt_pipe.close st.messages ;
|
||||
Lwt.return_unit
|
||||
end ;
|
||||
st.worker <-
|
||||
Lwt_utils.worker "writer"
|
||||
(fun () -> worker_loop st)
|
||||
(fun () -> Canceler.cancel st.canceler) ;
|
||||
st
|
||||
|
||||
let shutdown st =
|
||||
Canceler.cancel st.canceler >>= fun () ->
|
||||
st.worker
|
||||
|
||||
end
|
||||
|
||||
type 'msg t = {
|
||||
conn : connection ;
|
||||
reader : 'msg Reader.t ;
|
||||
writer : 'msg Writer.t ;
|
||||
}
|
||||
|
||||
let pp ppf { conn } = Connection_info.pp ppf conn.info
|
||||
let info { conn } = conn.info
|
||||
|
||||
let accept
|
||||
?incoming_message_queue_size ?outgoing_message_queue_size
|
||||
(fd, info, cryptobox_data) encoding =
|
||||
Lwt_utils.protect begin fun () ->
|
||||
Ack.write fd true >>=? fun () ->
|
||||
Ack.read fd
|
||||
end ~on_error:begin fun err ->
|
||||
P2p_io_scheduler.close fd >>= fun _ ->
|
||||
Lwt.return (Error err)
|
||||
end >>=? fun accepted ->
|
||||
fail_unless accepted Rejected >>=? fun () ->
|
||||
let canceler = Canceler.create () in
|
||||
let conn = { fd ; info ; cryptobox_data } in
|
||||
let reader =
|
||||
Reader.run ?size:incoming_message_queue_size conn encoding canceler
|
||||
and writer =
|
||||
Writer.run ?size:outgoing_message_queue_size conn encoding canceler in
|
||||
let conn = { conn ; reader ; writer } in
|
||||
Canceler.on_cancel canceler begin fun () ->
|
||||
P2p_io_scheduler.close fd >>= fun _ ->
|
||||
Lwt.return_unit
|
||||
end ;
|
||||
return conn
|
||||
|
||||
exception Not_available
|
||||
exception Connection_closed
|
||||
|
||||
let catch_closed_pipe f =
|
||||
Lwt.catch f begin function
|
||||
| Lwt_pipe.Closed -> fail P2p_io_scheduler.Connection_closed
|
||||
| exn -> fail (Exn exn)
|
||||
end
|
||||
|
||||
let is_writable { writer } =
|
||||
not (Lwt_pipe.is_full writer.messages)
|
||||
let wait_writable { writer } =
|
||||
Lwt_pipe.not_full writer.messages
|
||||
let write { writer } msg =
|
||||
catch_closed_pipe begin fun () ->
|
||||
Lwt_pipe.push writer.messages (msg, None) >>= return
|
||||
end
|
||||
let write_sync { writer } msg =
|
||||
catch_closed_pipe begin fun () ->
|
||||
let waiter, wakener = Lwt.wait () in
|
||||
Lwt_pipe.push writer.messages (msg, Some wakener) >>= fun () ->
|
||||
waiter
|
||||
end
|
||||
let write_now { writer } msg =
|
||||
try Ok (Lwt_pipe.push_now writer.messages (msg, None))
|
||||
with Lwt_pipe.Closed -> Error [P2p_io_scheduler.Connection_closed]
|
||||
|
||||
let is_readable { reader } =
|
||||
not (Lwt_pipe.is_empty reader.messages)
|
||||
let wait_readable { reader } =
|
||||
catch_closed_pipe begin fun () ->
|
||||
Lwt_pipe.values_available reader.messages >>= return
|
||||
end
|
||||
let read { reader } =
|
||||
catch_closed_pipe begin fun () ->
|
||||
Lwt_pipe.pop reader.messages
|
||||
end
|
||||
let read_now { reader } =
|
||||
try Lwt_pipe.pop_now reader.messages
|
||||
with Lwt_pipe.Closed -> Some (Error [P2p_io_scheduler.Connection_closed])
|
||||
|
||||
let stat { conn = { fd } } = P2p_io_scheduler.stat fd
|
||||
|
||||
let close ?(wait = false) st =
|
||||
begin
|
||||
if not wait then Lwt.return_unit
|
||||
else begin
|
||||
Lwt_pipe.close st.reader.messages ;
|
||||
Lwt_pipe.close st.writer.messages ;
|
||||
st.writer.worker
|
||||
end
|
||||
end >>= fun () ->
|
||||
Reader.shutdown st.reader >>= fun () ->
|
||||
Writer.shutdown st.writer >>= fun () ->
|
||||
P2p_io_scheduler.close st.conn.fd >>= fun _ ->
|
||||
Lwt.return_unit
|
119
src/node/net/p2p_connection.mli
Normal file
119
src/node/net/p2p_connection.mli
Normal file
@ -0,0 +1,119 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2016. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** This modules adds message encoding and encryption to
|
||||
[P2p_io_scheduler]'s generic throttled connections.
|
||||
|
||||
Each connection have an associated internal read (resp. write)
|
||||
queue containing messages (of type ['msg]), whose size can be
|
||||
limited by providing corresponding arguments to [accept].
|
||||
*)
|
||||
|
||||
open P2p_types
|
||||
|
||||
(** {1 Types} *)
|
||||
|
||||
type error += Decipher_error
|
||||
type error += Invalid_message_size
|
||||
type error += Encoding_error
|
||||
type error += Decoding_error
|
||||
type error += Rejected
|
||||
type error += Myself of Id_point.t
|
||||
type error += Not_enough_proof_of_work of Gid.t
|
||||
|
||||
type authenticated_fd
|
||||
(** Type of a connection that successfully passed the authentication
|
||||
phase, but has not been accepted yet. *)
|
||||
|
||||
type 'msg t
|
||||
(** Type of an accepted connection, parametrized by the type of
|
||||
messages exchanged between peers. *)
|
||||
|
||||
val pp : Format.formatter -> 'msg t -> unit
|
||||
val info: 'msg t -> Connection_info.t
|
||||
|
||||
(** {1 Low-level functions (do not use directly)} *)
|
||||
|
||||
val authenticate:
|
||||
proof_of_work_target:Crypto_box.target ->
|
||||
incoming:bool ->
|
||||
P2p_io_scheduler.connection -> Point.t ->
|
||||
?listening_port: int ->
|
||||
Identity.t -> Version.t list ->
|
||||
(Connection_info.t * authenticated_fd) tzresult Lwt.t
|
||||
(** (Low-level) (Cancelable) Authentication function of a remote
|
||||
peer. Used in [P2p_connection_pool], to promote a
|
||||
[P2P_io_scheduler.connection] into an [authenticated_fd] (auth
|
||||
correct, acceptation undecided). *)
|
||||
|
||||
val kick: authenticated_fd -> unit Lwt.t
|
||||
(** (Low-level) (Cancelable) [kick afd] notifies the remote peer that
|
||||
we refuse this connection and then closes [afd]. Used in
|
||||
[P2p_connection_pool] to reject an [aunthenticated_fd] which we do
|
||||
not want to connect to for some reason. *)
|
||||
|
||||
val accept:
|
||||
?incoming_message_queue_size:int ->
|
||||
?outgoing_message_queue_size:int ->
|
||||
authenticated_fd -> 'msg Data_encoding.t -> 'msg t tzresult Lwt.t
|
||||
(** (Low-level) (Cancelable) Accepts a remote peer given an
|
||||
authenticated_fd. Used in [P2p_connection_pool], to promote an
|
||||
[authenticated_fd] to the status of an active peer. *)
|
||||
|
||||
(** {1 IO functions on connections} *)
|
||||
|
||||
(** {2 Output functions} *)
|
||||
|
||||
val is_writable: 'msg t -> bool
|
||||
(** [is_writable conn] is [true] iff [conn] internal write queue is
|
||||
not full. *)
|
||||
|
||||
val wait_writable: 'msg t -> unit Lwt.t
|
||||
(** (Cancelable) [wait_writable conn] returns when [conn]'s internal
|
||||
write queue becomes writable (i.e. not full). *)
|
||||
|
||||
val write: 'msg t -> 'msg -> unit tzresult Lwt.t
|
||||
(** [write conn msg] returns when [msg] has successfully been added to
|
||||
[conn]'s internal write queue or fails with a corresponding
|
||||
error. *)
|
||||
|
||||
val write_now: 'msg t -> 'msg -> bool tzresult
|
||||
(** [write_now conn msg] is [Ok true] if [msg] has been added to
|
||||
[conn]'s internal write queue, [Ok false] if [msg] has been
|
||||
dropped, or fails with a correponding error otherwise. *)
|
||||
|
||||
val write_sync: 'msg t -> 'msg -> unit tzresult Lwt.t
|
||||
(** [write_sync conn msg] returns when [msg] has been successfully
|
||||
sent to the remote end of [conn], or fails accordingly. *)
|
||||
|
||||
(** {2 Input functions} *)
|
||||
|
||||
val is_readable: 'msg t -> bool
|
||||
(** [is_readable conn] is [true] iff [conn] internal read queue is not
|
||||
empty. *)
|
||||
|
||||
val wait_readable: 'msg t -> unit tzresult Lwt.t
|
||||
(** (Cancelable) [wait_readable conn] returns when [conn]'s internal
|
||||
read queue becomes readable (i.e. not empty). *)
|
||||
|
||||
val read: 'msg t -> 'msg tzresult Lwt.t
|
||||
(** [read conn msg] returns when [msg] has successfully been popped
|
||||
from [conn]'s internal read queue or fails with a corresponding
|
||||
error. *)
|
||||
|
||||
val read_now: 'msg t -> 'msg tzresult option
|
||||
(** [read_now conn msg] is [Some msg] if [conn]'s internal read queue
|
||||
is not empty, [None] if it is empty, or fails with a correponding
|
||||
error otherwise. *)
|
||||
|
||||
val stat: 'msg t -> Stat.t
|
||||
(** [stat conn] is a snapshot of current bandwidth usage for
|
||||
[conn]. *)
|
||||
|
||||
val close: ?wait:bool -> 'msg t -> unit Lwt.t
|
@ -11,6 +11,37 @@ open Logging.Net
|
||||
|
||||
module Canceler = Lwt_utils.Canceler
|
||||
|
||||
module Version = struct
|
||||
type t = {
|
||||
name : string ;
|
||||
major : int ;
|
||||
minor : int ;
|
||||
}
|
||||
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun { name; major; minor } -> (name, major, minor))
|
||||
(fun (name, major, minor) -> { name; major; minor })
|
||||
(obj3
|
||||
(req "name" string)
|
||||
(req "major" int8)
|
||||
(req "minor" int8))
|
||||
|
||||
(* the common version for a pair of peers, if any, is the maximum one,
|
||||
in lexicographic order *)
|
||||
let common la lb =
|
||||
let la = List.sort (fun l r -> compare r l) la in
|
||||
let lb = List.sort (fun l r -> compare r l) lb in
|
||||
let rec find = function
|
||||
| [], _ | _, [] -> None
|
||||
| ((a :: ta) as la), ((b :: tb) as lb) ->
|
||||
if a = b then Some a
|
||||
else if a < b then find (ta, lb)
|
||||
else find (la, tb)
|
||||
in find (la, lb)
|
||||
end
|
||||
|
||||
module Stat = struct
|
||||
|
||||
type t = {
|
||||
@ -44,3 +75,151 @@ module Gid = struct
|
||||
module Set = Set.Make (Crypto_box.Public_key_hash)
|
||||
module Table = Hash.Hash_table (Crypto_box.Public_key_hash)
|
||||
end
|
||||
|
||||
(* public types *)
|
||||
type addr = Ipaddr.V6.t
|
||||
type port = int
|
||||
|
||||
module Point = struct
|
||||
|
||||
module T = struct
|
||||
(* A net point (address x port). *)
|
||||
type t = addr * port
|
||||
let compare (a1, p1) (a2, p2) =
|
||||
match Ipaddr.V6.compare a1 a2 with
|
||||
| 0 -> p1 - p2
|
||||
| x -> x
|
||||
let equal p1 p2 = compare p1 p2 = 0
|
||||
let hash = Hashtbl.hash
|
||||
let pp ppf (addr, port) =
|
||||
Format.fprintf ppf "[%a]:%d" Ipaddr.V6.pp_hum addr port
|
||||
let pp_opt ppf = function
|
||||
| None -> Format.pp_print_string ppf "none"
|
||||
| Some point -> pp ppf point
|
||||
|
||||
let is_local (addr, _) = Ipaddr.V6.is_private addr
|
||||
let is_global (addr, _) = not @@ Ipaddr.V6.is_private addr
|
||||
|
||||
let to_sockaddr (addr, port) = Unix.(ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port))
|
||||
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun (addr, port) -> Ipaddr.V6.to_string addr, port)
|
||||
(fun (addr, port) -> Ipaddr.V6.of_string_exn addr, port)
|
||||
(obj2
|
||||
(req "addr" string)
|
||||
(req "port" int16))
|
||||
end
|
||||
|
||||
include T
|
||||
|
||||
(* Run-time point-or-gid indexed storage, one point is bound to at
|
||||
most one gid, which is the invariant we want to keep both for the
|
||||
connected peers table and the known peers one *)
|
||||
|
||||
module Map = Map.Make (T)
|
||||
module Set = Set.Make (T)
|
||||
module Table = Hashtbl.Make (T)
|
||||
|
||||
end
|
||||
|
||||
module Id_point = struct
|
||||
|
||||
module T = struct
|
||||
(* A net point (address x port). *)
|
||||
type t = addr * port option
|
||||
let empty = Ipaddr.V6.unspecified, None
|
||||
let compare (a1, p1) (a2, p2) =
|
||||
match Ipaddr.V6.compare a1 a2 with
|
||||
| 0 -> Pervasives.compare p1 p2
|
||||
| x -> x
|
||||
let equal p1 p2 = compare p1 p2 = 0
|
||||
let hash = Hashtbl.hash
|
||||
let pp ppf (addr, port) =
|
||||
match port with
|
||||
| None ->
|
||||
Format.fprintf ppf "[%a]:??" Ipaddr.V6.pp_hum addr
|
||||
| Some port ->
|
||||
Format.fprintf ppf "[%a]:%d" Ipaddr.V6.pp_hum addr port
|
||||
let pp_opt ppf = function
|
||||
| None -> Format.pp_print_string ppf "none"
|
||||
| Some point -> pp ppf point
|
||||
|
||||
let is_local (addr, _) = Ipaddr.V6.is_private addr
|
||||
let is_global (addr, _) = not @@ Ipaddr.V6.is_private addr
|
||||
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun (addr, port) -> Ipaddr.V6.to_bytes addr, port)
|
||||
(fun (addr, port) -> Ipaddr.V6.of_bytes_exn addr, port)
|
||||
(obj2
|
||||
(req "addr" string)
|
||||
(opt "port" int16))
|
||||
end
|
||||
|
||||
include T
|
||||
|
||||
(* Run-time point-or-gid indexed storage, one point is bound to at
|
||||
most one gid, which is the invariant we want to keep both for the
|
||||
connected peers table and the known peers one *)
|
||||
|
||||
module Map = Map.Make (T)
|
||||
module Set = Set.Make (T)
|
||||
module Table = Hashtbl.Make (T)
|
||||
|
||||
end
|
||||
|
||||
module Identity = struct
|
||||
|
||||
type t = {
|
||||
gid : Gid.t ;
|
||||
public_key : Crypto_box.public_key ;
|
||||
secret_key : Crypto_box.secret_key ;
|
||||
proof_of_work_stamp : Crypto_box.nonce ;
|
||||
}
|
||||
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun { public_key ; secret_key ; proof_of_work_stamp } ->
|
||||
(public_key, secret_key, proof_of_work_stamp))
|
||||
(fun (public_key, secret_key, proof_of_work_stamp) ->
|
||||
let gid = Crypto_box.hash public_key in
|
||||
{ gid ; public_key ; secret_key ; proof_of_work_stamp })
|
||||
(obj3
|
||||
(req "public_key" Crypto_box.public_key_encoding)
|
||||
(req "secret_key" Crypto_box.secret_key_encoding)
|
||||
(req "proof_of_work_stamp" Crypto_box.nonce_encoding))
|
||||
|
||||
let generate target =
|
||||
let secret_key, public_key, gid = Crypto_box.random_keypair () in
|
||||
let proof_of_work_stamp =
|
||||
Crypto_box.generate_proof_of_work public_key target in
|
||||
{ gid ; public_key ; secret_key ; proof_of_work_stamp }
|
||||
|
||||
end
|
||||
|
||||
module Connection_info = struct
|
||||
|
||||
type t = {
|
||||
incoming : bool;
|
||||
gid : Gid.t;
|
||||
id_point : Id_point.t;
|
||||
remote_socket_port : port;
|
||||
versions : Version.t list ;
|
||||
}
|
||||
|
||||
let pp ppf
|
||||
{ incoming ; id_point = (remote_addr, remote_port) ; gid } =
|
||||
Format.fprintf ppf "%a:%a {%a}%s"
|
||||
Ipaddr.V6.pp_hum remote_addr
|
||||
(fun ppf port ->
|
||||
match port with
|
||||
| None -> Format.pp_print_string ppf "??"
|
||||
| Some port -> Format.pp_print_int ppf port) remote_port
|
||||
Gid.pp gid
|
||||
(if incoming then " (incoming)" else "")
|
||||
|
||||
end
|
||||
|
@ -9,6 +9,93 @@
|
||||
|
||||
module Canceler = Lwt_utils.Canceler
|
||||
|
||||
(** Protocol version *)
|
||||
|
||||
module Version : sig
|
||||
type t = {
|
||||
name : string ;
|
||||
major : int ;
|
||||
minor : int ;
|
||||
}
|
||||
(** Type of a protocol version. *)
|
||||
|
||||
val encoding : t Data_encoding.t
|
||||
val common: t list -> t list -> t option
|
||||
end
|
||||
|
||||
|
||||
(** Gid, i.e. persistent peer identifier *)
|
||||
|
||||
module Gid : sig
|
||||
type t = Crypto_box.Public_key_hash.t
|
||||
(** Type of a gid, a public key hash. *)
|
||||
|
||||
val compare : t -> t -> int
|
||||
val equal : t -> t -> bool
|
||||
val pp : Format.formatter -> t -> unit
|
||||
val encoding : t Data_encoding.t
|
||||
module Map : Map.S with type key = t
|
||||
module Set : Set.S with type elt = t
|
||||
module Table : Hashtbl.S with type key = t
|
||||
end
|
||||
|
||||
type addr = Ipaddr.V6.t
|
||||
type port = int
|
||||
|
||||
|
||||
(** Point, i.e. socket address *)
|
||||
|
||||
module Point : sig
|
||||
type t = addr * port
|
||||
val compare : t -> t -> int
|
||||
val pp : Format.formatter -> t -> unit
|
||||
val pp_opt : Format.formatter -> t option -> unit
|
||||
val encoding : t Data_encoding.t
|
||||
val is_local : t -> bool
|
||||
val is_global : t -> bool
|
||||
val to_sockaddr : t -> Unix.sockaddr
|
||||
module Map : Map.S with type key = t
|
||||
module Set : Set.S with type elt = t
|
||||
module Table : Hashtbl.S with type key = t
|
||||
end
|
||||
|
||||
(** Point representing a reachable socket address *)
|
||||
|
||||
module Id_point : sig
|
||||
type t = addr * port option
|
||||
val compare : t -> t -> int
|
||||
val equal : t -> t -> bool
|
||||
val pp : Format.formatter -> t -> unit
|
||||
val pp_opt : Format.formatter -> t option -> unit
|
||||
val encoding : t Data_encoding.t
|
||||
val is_local : t -> bool
|
||||
val is_global : t -> bool
|
||||
module Map : Map.S with type key = t
|
||||
module Set : Set.S with type elt = t
|
||||
module Table : Hashtbl.S with type key = t
|
||||
end
|
||||
|
||||
|
||||
(** Identity *)
|
||||
|
||||
module Identity : sig
|
||||
type t = {
|
||||
gid : Gid.t ;
|
||||
public_key : Crypto_box.public_key ;
|
||||
secret_key : Crypto_box.secret_key ;
|
||||
proof_of_work_stamp : Crypto_box.nonce ;
|
||||
}
|
||||
(** Type of an identity, comprising a gid, a crypto keypair, and a
|
||||
proof of work stamp with enough difficulty so that the network
|
||||
accept this identity as genuine. *)
|
||||
|
||||
val encoding : t Data_encoding.t
|
||||
|
||||
val generate : Crypto_box.target -> t
|
||||
(** [generate target] is a freshly minted identity whose proof of
|
||||
work stamp difficulty is at least equal to [target]. *)
|
||||
end
|
||||
|
||||
|
||||
(** Bandwidth usage statistics *)
|
||||
|
||||
@ -24,3 +111,19 @@ module Stat : sig
|
||||
val pp: Format.formatter -> t -> unit
|
||||
|
||||
end
|
||||
|
||||
(** Information about a connection *)
|
||||
|
||||
module Connection_info : sig
|
||||
|
||||
type t = {
|
||||
incoming : bool;
|
||||
gid : Gid.t;
|
||||
id_point : Id_point.t;
|
||||
remote_socket_port : port;
|
||||
versions : Version.t list ;
|
||||
}
|
||||
|
||||
val pp: Format.formatter -> t -> unit
|
||||
|
||||
end
|
||||
|
@ -3,7 +3,8 @@ TESTS := \
|
||||
data-encoding \
|
||||
store context state \
|
||||
basic basic.sh \
|
||||
p2p-io-scheduler
|
||||
p2p-io-scheduler \
|
||||
p2p-connection \
|
||||
|
||||
all: test
|
||||
|
||||
@ -182,7 +183,7 @@ clean::
|
||||
############################################################################
|
||||
## p2p test program
|
||||
|
||||
.PHONY:build-test-io_schduler
|
||||
.PHONY:build-test-p2p-io-scheduler run-test-p2p-io-scheduler
|
||||
build-test-p2p-io-scheduler: test-p2p-io-scheduler
|
||||
run-test-p2p-io-scheduler:
|
||||
./test-p2p-io-scheduler \
|
||||
@ -190,16 +191,30 @@ run-test-p2p-io-scheduler:
|
||||
--max-upload-speed $$((1 << 18)) \
|
||||
--max-download-speed $$((1 << 20))
|
||||
|
||||
.PHONY:build-test-p2p-connection run-test-p2p-connection
|
||||
build-test-p2p-connection: test-p2p-connection
|
||||
run-test-p2p-connection:
|
||||
./test-p2p-connection
|
||||
|
||||
TEST_P2P_IO_SCHEDULER_IMPLS = \
|
||||
lib/process.ml \
|
||||
test_p2p_io_scheduler.ml
|
||||
|
||||
TEST_P2P_CONNECTION_IMPLS = \
|
||||
lib/process.ml \
|
||||
test_p2p_connection.ml
|
||||
|
||||
${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}: ${NODELIB}
|
||||
test-p2p-io-scheduler: ${NODELIB} ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}
|
||||
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
|
||||
|
||||
${TEST_P2P_CONNECTION_IMPLS:.ml=.cmx}: ${NODELIB}
|
||||
test-p2p-connection: ${NODELIB} ${TEST_P2P_CONNECTION_IMPLS:.ml=.cmx}
|
||||
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
|
||||
|
||||
clean::
|
||||
rm -f test-p2p-io_scheduler
|
||||
-rm -f test-p2p-io_scheduler
|
||||
-rm -f test-p2p-connection
|
||||
|
||||
############################################################################
|
||||
## lwt pipe test program
|
||||
|
204
test/test_p2p_connection.ml
Normal file
204
test/test_p2p_connection.ml
Normal file
@ -0,0 +1,204 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2016. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(* TODO Use Kaputt on the client side and remove `assert` from the
|
||||
server. *)
|
||||
|
||||
open Error_monad
|
||||
open P2p_types
|
||||
include Logging.Make (struct let name = "test-p2p-connection" end)
|
||||
|
||||
let proof_of_work_target =
|
||||
Crypto_box.make_target [Int64.shift_left 1L 48]
|
||||
let id1 = Identity.generate proof_of_work_target
|
||||
let id2 = Identity.generate proof_of_work_target
|
||||
|
||||
let id0 =
|
||||
(* Luckilly, this will be an insuficient proof of work! *)
|
||||
Identity.generate (Crypto_box.make_target [])
|
||||
|
||||
let versions = Version.[{ name = "TEST" ; minor = 0 ; major = 0 }]
|
||||
|
||||
let rec listen ?port addr =
|
||||
let tentative_port =
|
||||
match port with
|
||||
| None -> 1024 + Random.int 8192
|
||||
| Some port -> port in
|
||||
let uaddr = Ipaddr_unix.V6.to_inet_addr addr in
|
||||
let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in
|
||||
Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ;
|
||||
Lwt.catch begin fun () ->
|
||||
Lwt_unix.Versioned.bind_2 main_socket
|
||||
(ADDR_INET (uaddr, tentative_port)) >>= fun () ->
|
||||
Lwt_unix.listen main_socket 1 ;
|
||||
Lwt.return (main_socket, tentative_port)
|
||||
end begin function
|
||||
| Unix.Unix_error
|
||||
((Unix.EADDRINUSE | Unix.EADDRNOTAVAIL), _, _) when port = None ->
|
||||
listen addr
|
||||
| exn -> Lwt.fail exn
|
||||
end
|
||||
|
||||
let raw_accept sched main_socket =
|
||||
Lwt_unix.accept main_socket >>= fun (fd, sockaddr) ->
|
||||
let fd = P2p_io_scheduler.register sched fd in
|
||||
let point =
|
||||
match sockaddr with
|
||||
| Lwt_unix.ADDR_UNIX _ -> assert false
|
||||
| Lwt_unix.ADDR_INET (addr, port) ->
|
||||
Ipaddr_unix.V6.of_inet_addr_exn addr, port in
|
||||
Lwt.return (fd, point)
|
||||
|
||||
let accept sched main_socket =
|
||||
raw_accept sched main_socket >>= fun (fd, point) ->
|
||||
P2p_connection.authenticate
|
||||
~proof_of_work_target
|
||||
~incoming:true fd point id1 versions
|
||||
|
||||
let raw_connect sched addr port =
|
||||
let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in
|
||||
let uaddr =
|
||||
Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
|
||||
Lwt_unix.connect fd uaddr >>= fun () ->
|
||||
let fd = P2p_io_scheduler.register sched fd in
|
||||
Lwt.return fd
|
||||
|
||||
let connect sched addr port id =
|
||||
raw_connect sched addr port >>= fun fd ->
|
||||
P2p_connection.authenticate
|
||||
~proof_of_work_target
|
||||
~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) ->
|
||||
assert (not info.incoming) ;
|
||||
assert (Gid.compare info.gid id1.gid = 0) ;
|
||||
return auth_fd
|
||||
|
||||
let simple_msg =
|
||||
MBytes.create (1 lsl 1)
|
||||
|
||||
let is_rejected = function
|
||||
| Error [P2p_connection.Rejected] -> true
|
||||
| Ok _ | Error _ -> false
|
||||
|
||||
let is_connection_closed = function
|
||||
| Error [P2p_io_scheduler.Connection_closed] -> true
|
||||
| Ok _ | Error _ -> false
|
||||
|
||||
let bytes_encoding = Data_encoding.Variable.bytes
|
||||
|
||||
let server main_socket =
|
||||
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
|
||||
(* Low-level test. *)
|
||||
raw_accept sched main_socket >>= fun (fd, point) ->
|
||||
lwt_log_notice "Low_level" >>= fun () ->
|
||||
P2p_io_scheduler.write fd simple_msg >>=? fun () ->
|
||||
P2p_io_scheduler.close fd >>=? fun _ ->
|
||||
lwt_log_notice "Low_level OK" >>= fun () ->
|
||||
(* Kick the first connection. *)
|
||||
accept sched main_socket >>=? fun (info, auth_fd) ->
|
||||
lwt_log_notice "Kick" >>= fun () ->
|
||||
assert (info.incoming) ;
|
||||
assert (Gid.compare info.gid id2.gid = 0) ;
|
||||
P2p_connection.kick auth_fd >>= fun () ->
|
||||
lwt_log_notice "Kick OK" >>= fun () ->
|
||||
(* Let's be rejected. *)
|
||||
accept sched main_socket >>=? fun (info, auth_fd) ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>= fun conn ->
|
||||
assert (is_rejected conn) ;
|
||||
lwt_log_notice "Kicked OK" >>= fun () ->
|
||||
(* Accept and send a single message. *)
|
||||
accept sched main_socket >>=? fun (info, auth_fd) ->
|
||||
lwt_log_notice "Single" >>= fun () ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.write_sync conn simple_msg >>=? fun () ->
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Single OK" >>= fun () ->
|
||||
(* Accept and send a single message, while the client expected 2. *)
|
||||
accept sched main_socket >>=? fun (info, auth_fd) ->
|
||||
lwt_log_notice "Early close (read)" >>= fun () ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.write_sync conn simple_msg >>=? fun () ->
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Early close (read) OK" >>= fun () ->
|
||||
(* Accept and wait for the client to close the connection. *)
|
||||
accept sched main_socket >>=? fun (info, auth_fd) ->
|
||||
lwt_log_notice "Early close (write)" >>= fun () ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Early close (write) OK" >>= fun () ->
|
||||
P2p_io_scheduler.shutdown sched >>= fun () ->
|
||||
Lwt_unix.sleep 0.2 >>= fun () ->
|
||||
lwt_log_notice "Success" >>= fun () ->
|
||||
return ()
|
||||
|
||||
let client addr port =
|
||||
let msg = MBytes.create (MBytes.length simple_msg) in
|
||||
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
|
||||
raw_connect sched addr port >>= fun fd ->
|
||||
P2p_io_scheduler.read_full fd msg >>=? fun () ->
|
||||
assert (MBytes.compare simple_msg msg = 0) ;
|
||||
P2p_io_scheduler.close fd >>=? fun () ->
|
||||
lwt_log_notice "Low_level OK" >>= fun () ->
|
||||
(* let's be rejected. *)
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>= fun conn ->
|
||||
assert (is_rejected conn) ;
|
||||
lwt_log_notice "Kick OK" >>= fun () ->
|
||||
(* let's reject! *)
|
||||
lwt_log_notice "Kicked" >>= fun () ->
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.kick auth_fd >>= fun () ->
|
||||
(* let's exchange a simple message. *)
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.read conn >>=? fun msg ->
|
||||
assert (MBytes.compare simple_msg msg = 0) ;
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Simple OK" >>= fun () ->
|
||||
(* let's detect a closed connection on `read`. *)
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.read conn >>=? fun msg ->
|
||||
assert (MBytes.compare simple_msg msg = 0) ;
|
||||
P2p_connection.read conn >>= fun msg ->
|
||||
assert (is_connection_closed msg) ;
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Early close (read) OK" >>= fun () ->
|
||||
(* let's detect a closed connection on `write`. *)
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
Lwt_unix.sleep 0.1 >>= fun () ->
|
||||
P2p_connection.write_sync conn simple_msg >>= fun unit ->
|
||||
assert (is_connection_closed unit) ;
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Early close (write) OK" >>= fun () ->
|
||||
P2p_io_scheduler.shutdown sched >>= fun () ->
|
||||
lwt_log_notice "Success" >>= fun () ->
|
||||
return ()
|
||||
|
||||
let default_addr = Ipaddr.V6.localhost
|
||||
|
||||
let main () =
|
||||
listen default_addr >>= fun (main_socket, port) ->
|
||||
let server =
|
||||
Process.detach ~prefix:"server " begin fun () ->
|
||||
Process.handle_error begin fun () ->
|
||||
server main_socket
|
||||
end
|
||||
end in
|
||||
let client =
|
||||
Process.detach ~prefix:"client " begin fun () ->
|
||||
Lwt_utils.safe_close main_socket >>= fun () ->
|
||||
Process.handle_error begin fun () ->
|
||||
client default_addr port
|
||||
end
|
||||
end in
|
||||
Process.wait [ server ; client ]
|
||||
|
||||
let () =
|
||||
Lwt_main.run (main ())
|
Loading…
Reference in New Issue
Block a user