2018-06-29 16:08:08 +04:00
|
|
|
(*****************************************************************************)
|
|
|
|
(* *)
|
|
|
|
(* Open Source License *)
|
|
|
|
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
|
|
|
(* *)
|
|
|
|
(* Permission is hereby granted, free of charge, to any person obtaining a *)
|
|
|
|
(* copy of this software and associated documentation files (the "Software"),*)
|
|
|
|
(* to deal in the Software without restriction, including without limitation *)
|
|
|
|
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
|
|
|
|
(* and/or sell copies of the Software, and to permit persons to whom the *)
|
|
|
|
(* Software is furnished to do so, subject to the following conditions: *)
|
|
|
|
(* *)
|
|
|
|
(* The above copyright notice and this permission notice shall be included *)
|
|
|
|
(* in all copies or substantial portions of the Software. *)
|
|
|
|
(* *)
|
|
|
|
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
|
|
|
|
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
|
|
|
|
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
|
|
|
|
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
|
|
|
|
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
|
|
|
|
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
|
|
|
|
(* DEALINGS IN THE SOFTWARE. *)
|
|
|
|
(* *)
|
|
|
|
(*****************************************************************************)
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
(* TODO test `close ~wait:true`. *)
|
|
|
|
|
|
|
|
include Logging.Make(struct let name = "p2p.connection" end)
|
|
|
|
|
2017-01-20 18:25:12 +04:00
|
|
|
module Crypto = struct
|
|
|
|
|
2018-02-04 21:39:34 +04:00
|
|
|
let bufsize = 1 lsl 16 - 1
|
2017-01-20 18:25:12 +04:00
|
|
|
let header_length = 2
|
2018-02-04 21:39:34 +04:00
|
|
|
let max_content_length = bufsize - header_length - Crypto_box.boxzerobytes
|
2017-01-20 18:25:12 +04:00
|
|
|
|
|
|
|
type data = {
|
|
|
|
channel_key : Crypto_box.channel_key ;
|
|
|
|
mutable local_nonce : Crypto_box.nonce ;
|
|
|
|
mutable remote_nonce : Crypto_box.nonce ;
|
|
|
|
}
|
|
|
|
|
2018-02-04 21:39:34 +04:00
|
|
|
let write_chunk fd cryptobox_data msg =
|
|
|
|
let msglen = MBytes.length msg in
|
|
|
|
fail_unless
|
2018-02-27 19:33:50 +04:00
|
|
|
(msglen <= max_content_length) P2p_errors.Invalid_message_size >>=? fun () ->
|
2018-05-10 13:12:19 +04:00
|
|
|
let buf = MBytes.make (msglen + Crypto_box.zerobytes) '\x00' in
|
2018-02-04 21:39:34 +04:00
|
|
|
MBytes.blit msg 0 buf Crypto_box.zerobytes msglen ;
|
2017-01-20 18:25:12 +04:00
|
|
|
let local_nonce = cryptobox_data.local_nonce in
|
|
|
|
cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ;
|
2018-02-04 21:39:34 +04:00
|
|
|
Crypto_box.fast_box_noalloc
|
|
|
|
cryptobox_data.channel_key local_nonce buf ;
|
|
|
|
let encrypted_length = msglen + Crypto_box.boxzerobytes in
|
|
|
|
MBytes.set_int16 buf
|
|
|
|
(Crypto_box.boxzerobytes - header_length) encrypted_length ;
|
|
|
|
let payload = MBytes.sub buf (Crypto_box.boxzerobytes - header_length)
|
|
|
|
(header_length + encrypted_length) in
|
|
|
|
P2p_io_scheduler.write fd payload
|
2017-01-20 18:25:12 +04:00
|
|
|
|
|
|
|
let read_chunk fd cryptobox_data =
|
|
|
|
let header_buf = MBytes.create header_length in
|
|
|
|
P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () ->
|
2018-02-04 21:39:34 +04:00
|
|
|
let encrypted_length = MBytes.get_uint16 header_buf 0 in
|
2018-05-10 13:12:19 +04:00
|
|
|
let buf = MBytes.make (encrypted_length + Crypto_box.boxzerobytes) '\x00' in
|
2018-02-04 21:39:34 +04:00
|
|
|
P2p_io_scheduler.read_full
|
|
|
|
~pos:Crypto_box.boxzerobytes ~len:encrypted_length fd buf >>=? fun () ->
|
2017-01-20 18:25:12 +04:00
|
|
|
let remote_nonce = cryptobox_data.remote_nonce in
|
|
|
|
cryptobox_data.remote_nonce <- Crypto_box.increment_nonce remote_nonce ;
|
|
|
|
match
|
2018-02-04 21:39:34 +04:00
|
|
|
Crypto_box.fast_box_open_noalloc
|
|
|
|
cryptobox_data.channel_key remote_nonce buf
|
2017-01-20 18:25:12 +04:00
|
|
|
with
|
2018-02-04 21:39:34 +04:00
|
|
|
| false ->
|
2018-02-27 19:33:50 +04:00
|
|
|
fail P2p_errors.Decipher_error
|
2018-02-04 21:39:34 +04:00
|
|
|
| true ->
|
|
|
|
return (MBytes.sub buf Crypto_box.zerobytes
|
|
|
|
(encrypted_length - Crypto_box.boxzerobytes))
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2017-01-20 18:25:12 +04:00
|
|
|
end
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2017-04-18 20:32:31 +04:00
|
|
|
let check_binary_chunks_size size =
|
2018-02-04 21:39:34 +04:00
|
|
|
let value = size - Crypto_box.boxzerobytes - Crypto.header_length in
|
2017-04-18 20:32:31 +04:00
|
|
|
fail_unless
|
|
|
|
(value > 0 &&
|
|
|
|
value <= Crypto.max_content_length)
|
2018-02-27 19:33:50 +04:00
|
|
|
(P2p_errors.Invalid_chunks_size
|
2017-04-18 20:32:31 +04:00
|
|
|
{ value = size ;
|
2018-02-04 21:39:34 +04:00
|
|
|
min = Crypto.(header_length + Crypto_box.boxzerobytes + 1) ;
|
|
|
|
max = Crypto.bufsize ;
|
2017-04-18 20:32:31 +04:00
|
|
|
})
|
|
|
|
|
2017-01-14 16:14:02 +04:00
|
|
|
module Connection_message = struct
|
|
|
|
|
|
|
|
type t = {
|
|
|
|
port : int option ;
|
2018-01-24 15:48:25 +04:00
|
|
|
versions : P2p_version.t list ;
|
2017-01-14 16:14:02 +04:00
|
|
|
public_key : Crypto_box.public_key ;
|
|
|
|
proof_of_work_stamp : Crypto_box.nonce ;
|
|
|
|
message_nonce : Crypto_box.nonce ;
|
|
|
|
}
|
|
|
|
|
|
|
|
let encoding =
|
|
|
|
let open Data_encoding in
|
|
|
|
conv
|
|
|
|
(fun { port ; public_key ; proof_of_work_stamp ;
|
|
|
|
message_nonce ; versions } ->
|
|
|
|
let port = match port with None -> 0 | Some port -> port in
|
|
|
|
(port, public_key, proof_of_work_stamp,
|
|
|
|
message_nonce, versions))
|
|
|
|
(fun (port, public_key, proof_of_work_stamp,
|
|
|
|
message_nonce, versions) ->
|
|
|
|
let port = if port = 0 then None else Some port in
|
|
|
|
{ port ; public_key ; proof_of_work_stamp ;
|
|
|
|
message_nonce ; versions })
|
|
|
|
(obj5
|
|
|
|
(req "port" uint16)
|
|
|
|
(req "pubkey" Crypto_box.public_key_encoding)
|
|
|
|
(req "proof_of_work_stamp" Crypto_box.nonce_encoding)
|
|
|
|
(req "message_nonce" Crypto_box.nonce_encoding)
|
2018-01-24 15:48:25 +04:00
|
|
|
(req "versions" (Variable.list P2p_version.encoding)))
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
let write fd message =
|
|
|
|
let encoded_message_len =
|
|
|
|
Data_encoding.Binary.length encoding message in
|
|
|
|
fail_unless
|
2017-11-17 16:45:31 +04:00
|
|
|
(encoded_message_len < 1 lsl (Crypto.header_length * 8))
|
2018-02-27 19:33:50 +04:00
|
|
|
P2p_errors.Encoding_error >>=? fun () ->
|
2017-01-20 18:25:12 +04:00
|
|
|
let len = Crypto.header_length + encoded_message_len in
|
2017-01-14 16:14:02 +04:00
|
|
|
let buf = MBytes.create len in
|
2017-01-20 18:25:12 +04:00
|
|
|
match Data_encoding.Binary.write
|
2018-05-12 19:48:50 +04:00
|
|
|
encoding message buf Crypto.header_length len with
|
2017-01-14 16:14:02 +04:00
|
|
|
| None ->
|
2018-02-27 19:33:50 +04:00
|
|
|
fail P2p_errors.Encoding_error
|
2017-01-14 16:14:02 +04:00
|
|
|
| Some last ->
|
2018-02-27 19:33:50 +04:00
|
|
|
fail_unless (last = len) P2p_errors.Encoding_error >>=? fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
MBytes.set_int16 buf 0 encoded_message_len ;
|
2018-05-18 18:57:43 +04:00
|
|
|
P2p_io_scheduler.write fd buf >>=? fun () ->
|
|
|
|
(* We return the raw message as it is used later to compute
|
|
|
|
the nonces *)
|
|
|
|
return buf
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
let read fd =
|
2017-01-20 18:25:12 +04:00
|
|
|
let header_buf = MBytes.create Crypto.header_length in
|
|
|
|
P2p_io_scheduler.read_full
|
|
|
|
~len:Crypto.header_length fd header_buf >>=? fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
let len = MBytes.get_uint16 header_buf 0 in
|
2018-05-18 18:57:43 +04:00
|
|
|
let pos = Crypto.header_length in
|
|
|
|
let buf = MBytes.create (pos + len) in
|
|
|
|
MBytes.set_int16 buf 0 len ;
|
|
|
|
P2p_io_scheduler.read_full ~len ~pos fd buf >>=? fun () ->
|
|
|
|
match Data_encoding.Binary.read encoding buf pos len with
|
2017-01-14 16:14:02 +04:00
|
|
|
| None ->
|
2018-02-27 19:33:50 +04:00
|
|
|
fail P2p_errors.Decoding_error
|
2018-05-18 18:57:43 +04:00
|
|
|
| Some (next_pos, message) ->
|
|
|
|
if next_pos <> pos+len then
|
2018-02-27 19:33:50 +04:00
|
|
|
fail P2p_errors.Decoding_error
|
2017-01-14 16:14:02 +04:00
|
|
|
else
|
2018-05-18 18:57:43 +04:00
|
|
|
return (message, buf)
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
end
|
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
type 'meta metadata_config = {
|
|
|
|
conn_meta_encoding : 'meta Data_encoding.t ;
|
|
|
|
conn_meta_value : P2p_peer.Id.t -> 'meta ;
|
|
|
|
private_node : 'meta -> bool ;
|
|
|
|
}
|
|
|
|
|
|
|
|
module Metadata = struct
|
|
|
|
|
|
|
|
let write metadata_config cryptobox_data fd message =
|
|
|
|
let encoded_message_len =
|
|
|
|
Data_encoding.Binary.length metadata_config.conn_meta_encoding message in
|
|
|
|
let buf = MBytes.create encoded_message_len in
|
|
|
|
match
|
|
|
|
Data_encoding.Binary.write
|
|
|
|
metadata_config.conn_meta_encoding message buf 0 encoded_message_len
|
|
|
|
with
|
|
|
|
| None ->
|
|
|
|
fail P2p_errors.Encoding_error
|
|
|
|
| Some last ->
|
|
|
|
fail_unless (last = encoded_message_len)
|
|
|
|
P2p_errors.Encoding_error >>=? fun () ->
|
|
|
|
Crypto.write_chunk cryptobox_data fd buf
|
|
|
|
|
|
|
|
let read metadata_config fd cryptobox_data =
|
|
|
|
Crypto.read_chunk fd cryptobox_data >>=? fun buf ->
|
|
|
|
let length = MBytes.length buf in
|
|
|
|
match
|
|
|
|
Data_encoding.Binary.read
|
|
|
|
metadata_config.conn_meta_encoding buf 0 length
|
|
|
|
with
|
|
|
|
| None ->
|
|
|
|
fail P2p_errors.Decoding_error
|
|
|
|
| Some (read_len, message) ->
|
|
|
|
if read_len <> length then
|
|
|
|
fail P2p_errors.Decoding_error
|
|
|
|
else
|
|
|
|
return message
|
|
|
|
|
|
|
|
end
|
|
|
|
|
2017-01-14 16:14:02 +04:00
|
|
|
module Ack = struct
|
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
type t = Ack | Nack
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
let encoding =
|
2018-05-15 15:16:08 +04:00
|
|
|
let open Data_encoding in
|
2018-06-05 03:45:43 +04:00
|
|
|
let ack_encoding = obj1 (req "ack" empty) in
|
2018-05-15 15:16:08 +04:00
|
|
|
let nack_encoding = obj1 (req "nack" empty) in
|
|
|
|
let ack_case tag =
|
|
|
|
case tag ack_encoding
|
2018-06-01 01:19:43 +04:00
|
|
|
~title:"Ack"
|
2018-05-15 15:16:08 +04:00
|
|
|
(function
|
2018-06-05 03:45:43 +04:00
|
|
|
| Ack -> Some ()
|
2018-05-15 15:16:08 +04:00
|
|
|
| _ -> None)
|
2018-06-05 03:45:43 +04:00
|
|
|
(fun () -> Ack) in
|
2018-05-15 15:16:08 +04:00
|
|
|
let nack_case tag =
|
|
|
|
case tag nack_encoding
|
2018-06-01 01:19:43 +04:00
|
|
|
~title:"Nack"
|
2018-05-15 15:16:08 +04:00
|
|
|
(function
|
|
|
|
| Nack -> Some ()
|
|
|
|
| _ -> None
|
|
|
|
)
|
|
|
|
(fun _ -> Nack) in
|
|
|
|
union [
|
|
|
|
ack_case (Tag 0) ;
|
2018-06-05 03:45:43 +04:00
|
|
|
nack_case (Tag 255) ;
|
2018-05-15 15:16:08 +04:00
|
|
|
]
|
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
let write cryptobox_data fd message =
|
2018-05-15 15:16:08 +04:00
|
|
|
let encoded_message_len =
|
|
|
|
Data_encoding.Binary.length encoding message in
|
|
|
|
let buf = MBytes.create encoded_message_len in
|
|
|
|
match Data_encoding.Binary.write encoding message buf 0 encoded_message_len with
|
|
|
|
| None ->
|
|
|
|
fail P2p_errors.Encoding_error
|
|
|
|
| Some last ->
|
|
|
|
fail_unless (last = encoded_message_len)
|
|
|
|
P2p_errors.Encoding_error >>=? fun () ->
|
|
|
|
Crypto.write_chunk cryptobox_data fd buf
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
let read fd cryptobox_data =
|
2017-01-20 18:25:12 +04:00
|
|
|
Crypto.read_chunk fd cryptobox_data >>=? fun buf ->
|
2018-05-15 15:16:08 +04:00
|
|
|
let length = MBytes.length buf in
|
|
|
|
match Data_encoding.Binary.read encoding buf 0 length with
|
|
|
|
| None ->
|
|
|
|
fail P2p_errors.Decoding_error
|
|
|
|
| Some (read_len, message) ->
|
|
|
|
if read_len <> length then
|
|
|
|
fail P2p_errors.Decoding_error
|
|
|
|
else
|
|
|
|
return message
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
end
|
|
|
|
|
2018-05-15 15:16:08 +04:00
|
|
|
type 'conn_meta authenticated_fd = {
|
|
|
|
fd: P2p_io_scheduler.connection ;
|
2018-06-05 03:45:43 +04:00
|
|
|
info: 'conn_meta P2p_connection.Info.t ;
|
2018-05-15 15:16:08 +04:00
|
|
|
cryptobox_data: Crypto.data ;
|
|
|
|
}
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
let kick { fd ; cryptobox_data ; _ } =
|
|
|
|
Ack.write fd cryptobox_data Nack >>= fun _ ->
|
2017-01-14 16:14:02 +04:00
|
|
|
P2p_io_scheduler.close fd >>= fun _ ->
|
|
|
|
Lwt.return_unit
|
|
|
|
|
|
|
|
(* First step: write and read credentials, makes no difference
|
|
|
|
whether we're trying to connect to a peer or checking an incoming
|
|
|
|
connection, both parties must first introduce themselves. *)
|
|
|
|
let authenticate
|
|
|
|
~proof_of_work_target
|
|
|
|
~incoming fd (remote_addr, remote_socket_port as point)
|
2018-06-05 03:45:43 +04:00
|
|
|
?listening_port identity supported_versions metadata_config =
|
2018-05-18 18:57:43 +04:00
|
|
|
let local_nonce_seed = Crypto_box.random_nonce () in
|
2018-01-24 15:48:25 +04:00
|
|
|
lwt_debug "Sending authenfication to %a" P2p_point.Id.pp point >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Connection_message.write fd
|
2018-01-24 15:48:25 +04:00
|
|
|
{ public_key = identity.P2p_identity.public_key ;
|
2017-01-14 16:14:02 +04:00
|
|
|
proof_of_work_stamp = identity.proof_of_work_stamp ;
|
2018-05-18 18:57:43 +04:00
|
|
|
message_nonce = local_nonce_seed ;
|
2017-01-14 16:14:02 +04:00
|
|
|
port = listening_port ;
|
2018-05-18 18:57:43 +04:00
|
|
|
versions = supported_versions } >>=? fun sent_msg ->
|
|
|
|
Connection_message.read fd >>=? fun (msg, recv_msg) ->
|
2017-01-14 16:14:02 +04:00
|
|
|
let remote_listening_port =
|
|
|
|
if incoming then msg.port else Some remote_socket_port in
|
|
|
|
let id_point = remote_addr, remote_listening_port in
|
2017-02-24 06:50:33 +04:00
|
|
|
let remote_peer_id = Crypto_box.hash msg.public_key in
|
2017-01-14 16:14:02 +04:00
|
|
|
fail_unless
|
2018-01-24 15:48:25 +04:00
|
|
|
(remote_peer_id <> identity.P2p_identity.peer_id)
|
2018-02-27 19:33:50 +04:00
|
|
|
(P2p_errors.Myself id_point) >>=? fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
fail_unless
|
|
|
|
(Crypto_box.check_proof_of_work
|
|
|
|
msg.public_key msg.proof_of_work_stamp proof_of_work_target)
|
2018-02-27 19:33:50 +04:00
|
|
|
(P2p_errors.Not_enough_proof_of_work remote_peer_id) >>=? fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
let channel_key =
|
2018-01-24 15:48:25 +04:00
|
|
|
Crypto_box.precompute identity.P2p_identity.secret_key msg.public_key in
|
2018-05-18 18:57:43 +04:00
|
|
|
let (local_nonce, remote_nonce) =
|
|
|
|
Crypto_box.generate_nonces ~incoming ~sent_msg ~recv_msg in
|
|
|
|
let cryptobox_data = { Crypto.channel_key ; local_nonce ; remote_nonce } in
|
2018-06-05 03:45:43 +04:00
|
|
|
let local_metadata = metadata_config.conn_meta_value remote_peer_id in
|
|
|
|
Metadata.write metadata_config fd cryptobox_data local_metadata >>=? fun () ->
|
|
|
|
Metadata.read metadata_config fd cryptobox_data >>=? fun remote_metadata ->
|
2017-01-14 16:14:02 +04:00
|
|
|
let info =
|
2018-01-24 15:48:25 +04:00
|
|
|
{ P2p_connection.Info.peer_id = remote_peer_id ;
|
2017-02-24 06:50:33 +04:00
|
|
|
versions = msg.versions ; incoming ;
|
2018-05-31 20:37:19 +04:00
|
|
|
id_point ; remote_socket_port ;
|
2018-06-05 03:45:43 +04:00
|
|
|
private_node = metadata_config.private_node remote_metadata ;
|
2018-06-05 13:41:23 +04:00
|
|
|
local_metadata ;
|
2018-06-05 03:45:43 +04:00
|
|
|
remote_metadata ;
|
2018-05-31 20:37:19 +04:00
|
|
|
} in
|
2018-06-05 03:45:43 +04:00
|
|
|
return (info, { fd ; info ; cryptobox_data })
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
type 'meta connection = {
|
2017-03-14 13:51:44 +04:00
|
|
|
id : int ;
|
2018-06-05 03:45:43 +04:00
|
|
|
info : 'meta P2p_connection.Info.t ;
|
2017-01-14 16:14:02 +04:00
|
|
|
fd : P2p_io_scheduler.connection ;
|
2017-01-20 18:25:12 +04:00
|
|
|
cryptobox_data : Crypto.data ;
|
2017-01-14 16:14:02 +04:00
|
|
|
}
|
|
|
|
|
2017-03-14 13:51:44 +04:00
|
|
|
let next_conn_id =
|
|
|
|
let cpt = ref 0 in
|
|
|
|
fun () -> incr cpt ;!cpt
|
|
|
|
|
2017-01-14 16:14:02 +04:00
|
|
|
module Reader = struct
|
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
type ('msg, 'meta) t = {
|
2017-11-27 09:13:12 +04:00
|
|
|
canceler: Lwt_canceler.t ;
|
2018-06-05 03:45:43 +04:00
|
|
|
conn: 'meta connection ;
|
2017-01-14 16:14:02 +04:00
|
|
|
encoding: 'msg Data_encoding.t ;
|
2017-01-23 20:19:54 +04:00
|
|
|
messages: (int * 'msg) tzresult Lwt_pipe.t ;
|
2017-01-14 16:14:02 +04:00
|
|
|
mutable worker: unit Lwt.t ;
|
|
|
|
}
|
|
|
|
|
2018-05-12 18:57:39 +04:00
|
|
|
let read_message st init =
|
2017-04-18 20:32:31 +04:00
|
|
|
let rec loop status =
|
|
|
|
Lwt_unix.yield () >>= fun () ->
|
|
|
|
let open Data_encoding.Binary in
|
|
|
|
match status with
|
2018-05-12 18:57:39 +04:00
|
|
|
| Success { result ; size ; stream } ->
|
2018-06-27 06:05:42 +04:00
|
|
|
return_some (result, size, stream)
|
2018-05-12 18:57:39 +04:00
|
|
|
| Error _ ->
|
2017-04-18 20:32:31 +04:00
|
|
|
lwt_debug "[read_message] incremental decoding error" >>= fun () ->
|
2018-06-27 06:05:42 +04:00
|
|
|
return_none
|
2017-04-18 20:32:31 +04:00
|
|
|
| Await decode_next_buf ->
|
2018-02-08 13:51:01 +04:00
|
|
|
protect ~canceler:st.canceler begin fun () ->
|
2017-04-18 20:32:31 +04:00
|
|
|
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data
|
|
|
|
end >>=? fun buf ->
|
|
|
|
lwt_debug
|
|
|
|
"reading %d bytes from %a"
|
2018-06-05 03:36:36 +04:00
|
|
|
(MBytes.length buf) P2p_peer.Id.pp st.conn.info.peer_id >>= fun () ->
|
2017-04-18 20:32:31 +04:00
|
|
|
loop (decode_next_buf buf) in
|
2018-05-12 18:57:39 +04:00
|
|
|
loop (Data_encoding.Binary.read_stream ?init st.encoding)
|
2017-04-18 20:32:31 +04:00
|
|
|
|
|
|
|
|
2018-05-12 18:57:39 +04:00
|
|
|
let rec worker_loop st stream =
|
2017-04-18 20:32:31 +04:00
|
|
|
begin
|
2018-05-12 18:57:39 +04:00
|
|
|
read_message st stream >>=? fun msg ->
|
2017-03-27 18:40:24 +04:00
|
|
|
match msg with
|
|
|
|
| None ->
|
2018-02-08 13:51:01 +04:00
|
|
|
protect ~canceler:st.canceler begin fun () ->
|
2018-02-27 19:33:50 +04:00
|
|
|
Lwt_pipe.push st.messages (Error [P2p_errors.Decoding_error]) >>= fun () ->
|
2018-06-27 06:05:42 +04:00
|
|
|
return_none
|
2017-11-08 19:10:40 +04:00
|
|
|
end
|
2018-05-12 18:57:39 +04:00
|
|
|
| Some (msg, size, stream) ->
|
2018-02-08 13:51:01 +04:00
|
|
|
protect ~canceler:st.canceler begin fun () ->
|
2017-11-08 19:10:40 +04:00
|
|
|
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
2018-06-27 06:05:42 +04:00
|
|
|
return_some stream
|
2017-11-08 19:10:40 +04:00
|
|
|
end
|
2017-01-14 16:14:02 +04:00
|
|
|
end >>= function
|
2018-05-12 18:57:39 +04:00
|
|
|
| Ok (Some stream) ->
|
|
|
|
worker_loop st (Some stream)
|
2017-04-18 20:32:31 +04:00
|
|
|
| Ok None ->
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-03-27 18:40:24 +04:00
|
|
|
Lwt.return_unit
|
2018-02-08 13:51:01 +04:00
|
|
|
| Error [Canceled | Exn Lwt_pipe.Closed] ->
|
2017-04-18 20:32:31 +04:00
|
|
|
lwt_debug "connection closed to %a"
|
2018-06-05 03:36:36 +04:00
|
|
|
P2p_peer.Id.pp st.conn.info.peer_id >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt.return_unit
|
|
|
|
| Error _ as err ->
|
2017-03-27 18:40:24 +04:00
|
|
|
Lwt_pipe.safe_push_now st.messages err ;
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt.return_unit
|
|
|
|
|
|
|
|
let run ?size conn encoding canceler =
|
2017-01-24 02:59:16 +04:00
|
|
|
let compute_size = function
|
2017-11-11 06:34:12 +04:00
|
|
|
| Ok (size, _) -> (Sys.word_size / 8) * 11 + size + Lwt_pipe.push_overhead
|
2017-01-24 02:59:16 +04:00
|
|
|
| Error _ -> 0 (* we push Error only when we close the socket,
|
|
|
|
we don't fear memory leaks in that case... *) in
|
2017-11-27 09:13:12 +04:00
|
|
|
let size = Option.map size ~f:(fun max -> (max, compute_size)) in
|
2017-01-14 16:14:02 +04:00
|
|
|
let st =
|
|
|
|
{ canceler ; conn ; encoding ;
|
|
|
|
messages = Lwt_pipe.create ?size () ;
|
|
|
|
worker = Lwt.return_unit ;
|
|
|
|
} in
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.on_cancel st.canceler begin fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt_pipe.close st.messages ;
|
|
|
|
Lwt.return_unit
|
|
|
|
end ;
|
|
|
|
st.worker <-
|
|
|
|
Lwt_utils.worker "reader"
|
2018-05-12 18:57:39 +04:00
|
|
|
~run:(fun () -> worker_loop st None)
|
2017-11-27 09:13:12 +04:00
|
|
|
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
|
2017-01-14 16:14:02 +04:00
|
|
|
st
|
|
|
|
|
|
|
|
let shutdown st =
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
st.worker
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
module Writer = struct
|
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
type ('msg, 'meta) t = {
|
2017-11-27 09:13:12 +04:00
|
|
|
canceler: Lwt_canceler.t ;
|
2018-06-05 03:45:43 +04:00
|
|
|
conn: 'meta connection ;
|
2017-01-14 16:14:02 +04:00
|
|
|
encoding: 'msg Data_encoding.t ;
|
2017-04-18 20:32:31 +04:00
|
|
|
messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ;
|
2017-01-14 16:14:02 +04:00
|
|
|
mutable worker: unit Lwt.t ;
|
2017-04-18 20:32:31 +04:00
|
|
|
binary_chunks_size: int ; (* in bytes *)
|
2017-01-14 16:14:02 +04:00
|
|
|
}
|
|
|
|
|
2017-11-13 17:29:28 +04:00
|
|
|
let send_message st buf =
|
2017-04-18 20:32:31 +04:00
|
|
|
let rec loop = function
|
2018-06-26 13:07:12 +04:00
|
|
|
| [] -> return_unit
|
2017-04-18 20:32:31 +04:00
|
|
|
| buf :: l ->
|
2018-02-08 13:51:01 +04:00
|
|
|
protect ~canceler:st.canceler begin fun () ->
|
2017-04-18 20:32:31 +04:00
|
|
|
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
|
|
|
|
end >>=? fun () ->
|
|
|
|
lwt_debug "writing %d bytes to %a"
|
2018-06-05 03:36:36 +04:00
|
|
|
(MBytes.length buf) P2p_peer.Id.pp st.conn.info.peer_id >>= fun () ->
|
2017-04-18 20:32:31 +04:00
|
|
|
loop l in
|
|
|
|
loop buf
|
|
|
|
|
2017-01-14 16:14:02 +04:00
|
|
|
let encode_message st msg =
|
2018-05-13 14:53:01 +04:00
|
|
|
try ok (MBytes.cut
|
|
|
|
st.binary_chunks_size
|
2018-05-12 21:54:57 +04:00
|
|
|
(Data_encoding.Binary.to_bytes_exn st.encoding msg))
|
|
|
|
with Data_encoding.Binary.Write_error _ ->
|
|
|
|
error P2p_errors.Encoding_error
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
let rec worker_loop st =
|
|
|
|
Lwt_unix.yield () >>= fun () ->
|
2018-02-08 13:51:01 +04:00
|
|
|
protect ~canceler:st.canceler begin fun () ->
|
2017-04-10 02:10:42 +04:00
|
|
|
Lwt_pipe.pop st.messages >>= return
|
2017-01-14 16:14:02 +04:00
|
|
|
end >>= function
|
2018-02-08 13:51:01 +04:00
|
|
|
| Error [Canceled | Exn Lwt_pipe.Closed] ->
|
2017-02-13 16:37:57 +04:00
|
|
|
lwt_debug "connection closed to %a"
|
2018-06-05 03:36:36 +04:00
|
|
|
P2p_peer.Id.pp st.conn.info.peer_id >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt.return_unit
|
|
|
|
| Error err ->
|
|
|
|
lwt_log_error
|
2017-02-13 16:37:57 +04:00
|
|
|
"@[<v 2>error writing to %a@ %a@]"
|
2018-06-05 03:36:36 +04:00
|
|
|
P2p_peer.Id.pp st.conn.info.peer_id pp_print_error err >>= fun () ->
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt.return_unit
|
2017-04-10 02:10:42 +04:00
|
|
|
| Ok (buf, wakener) ->
|
2017-04-18 20:32:31 +04:00
|
|
|
send_message st buf >>= fun res ->
|
2017-04-10 02:10:42 +04:00
|
|
|
match res with
|
|
|
|
| Ok () ->
|
2017-11-27 09:13:12 +04:00
|
|
|
Option.iter wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
|
2017-04-10 02:10:42 +04:00
|
|
|
worker_loop st
|
|
|
|
| Error err ->
|
2017-11-27 09:13:12 +04:00
|
|
|
Option.iter wakener
|
2017-04-10 02:10:42 +04:00
|
|
|
~f:(fun u ->
|
|
|
|
Lwt.wakeup_later u
|
2018-02-27 19:33:50 +04:00
|
|
|
(Error [P2p_errors.Connection_closed])) ;
|
2017-04-10 02:10:42 +04:00
|
|
|
match err with
|
2018-02-08 13:51:01 +04:00
|
|
|
| [ Canceled | Exn Lwt_pipe.Closed ] ->
|
2017-04-10 02:10:42 +04:00
|
|
|
lwt_debug "connection closed to %a"
|
2018-06-05 03:36:36 +04:00
|
|
|
P2p_peer.Id.pp st.conn.info.peer_id >>= fun () ->
|
2017-04-10 02:10:42 +04:00
|
|
|
Lwt.return_unit
|
2018-02-27 19:33:50 +04:00
|
|
|
| [ P2p_errors.Connection_closed ] ->
|
2017-04-10 02:10:42 +04:00
|
|
|
lwt_debug "connection closed to %a"
|
2018-06-05 03:36:36 +04:00
|
|
|
P2p_peer.Id.pp st.conn.info.peer_id >>= fun () ->
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-04-10 02:10:42 +04:00
|
|
|
Lwt.return_unit
|
|
|
|
| err ->
|
|
|
|
lwt_log_error
|
|
|
|
"@[<v 2>error writing to %a@ %a@]"
|
2018-06-05 03:36:36 +04:00
|
|
|
P2p_peer.Id.pp st.conn.info.peer_id
|
2017-04-10 02:10:42 +04:00
|
|
|
pp_print_error err >>= fun () ->
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-04-10 02:10:42 +04:00
|
|
|
Lwt.return_unit
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2017-04-18 20:32:31 +04:00
|
|
|
let run
|
|
|
|
?size ?binary_chunks_size
|
|
|
|
conn encoding canceler =
|
|
|
|
let binary_chunks_size =
|
|
|
|
match binary_chunks_size with
|
|
|
|
| None -> Crypto.max_content_length
|
|
|
|
| Some size ->
|
2018-02-04 21:39:34 +04:00
|
|
|
let size = size - Crypto_box.boxzerobytes - Crypto.header_length in
|
2017-04-18 20:32:31 +04:00
|
|
|
assert (size > 0) ;
|
|
|
|
assert (size <= Crypto.max_content_length) ;
|
|
|
|
size
|
|
|
|
in
|
|
|
|
let compute_size =
|
|
|
|
let buf_list_size =
|
|
|
|
List.fold_left
|
|
|
|
(fun sz buf ->
|
|
|
|
sz + MBytes.length buf + 2 * Sys.word_size) 0
|
|
|
|
in
|
|
|
|
function
|
2017-11-11 06:34:12 +04:00
|
|
|
| buf_l, None ->
|
|
|
|
Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead
|
|
|
|
| buf_l, Some _ ->
|
|
|
|
2 * Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead
|
2017-01-24 02:59:16 +04:00
|
|
|
in
|
2017-11-27 09:13:12 +04:00
|
|
|
let size = Option.map size ~f:(fun max -> max, compute_size) in
|
2017-01-14 16:14:02 +04:00
|
|
|
let st =
|
|
|
|
{ canceler ; conn ; encoding ;
|
|
|
|
messages = Lwt_pipe.create ?size () ;
|
|
|
|
worker = Lwt.return_unit ;
|
2017-04-18 20:32:31 +04:00
|
|
|
binary_chunks_size = binary_chunks_size ;
|
2017-01-14 16:14:02 +04:00
|
|
|
} in
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.on_cancel st.canceler begin fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt_pipe.close st.messages ;
|
2017-04-10 02:10:42 +04:00
|
|
|
while not (Lwt_pipe.is_empty st.messages) do
|
|
|
|
let _, w = Lwt_pipe.pop_now_exn st.messages in
|
2017-11-27 09:13:12 +04:00
|
|
|
Option.iter w
|
2017-04-10 02:10:42 +04:00
|
|
|
~f:(fun u -> Lwt.wakeup_later u (Error [Exn Lwt_pipe.Closed]))
|
|
|
|
done ;
|
2017-01-14 16:14:02 +04:00
|
|
|
Lwt.return_unit
|
|
|
|
end ;
|
|
|
|
st.worker <-
|
|
|
|
Lwt_utils.worker "writer"
|
2017-11-13 17:29:28 +04:00
|
|
|
~run:(fun () -> worker_loop st)
|
2017-11-27 09:13:12 +04:00
|
|
|
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
|
2017-01-14 16:14:02 +04:00
|
|
|
st
|
|
|
|
|
|
|
|
let shutdown st =
|
2017-11-27 09:13:12 +04:00
|
|
|
Lwt_canceler.cancel st.canceler >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
st.worker
|
|
|
|
|
|
|
|
end
|
|
|
|
|
2018-05-28 22:38:08 +04:00
|
|
|
type ('msg, 'meta) t = {
|
2018-06-05 03:45:43 +04:00
|
|
|
conn : 'meta connection ;
|
|
|
|
reader : ('msg, 'meta) Reader.t ;
|
|
|
|
writer : ('msg, 'meta) Writer.t ;
|
2017-01-14 16:14:02 +04:00
|
|
|
}
|
|
|
|
|
2017-03-14 13:51:44 +04:00
|
|
|
let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
|
|
|
|
|
2018-06-05 03:45:43 +04:00
|
|
|
let pp ppf { conn } = P2p_connection.Info.pp (fun _ _ -> ()) ppf conn.info
|
2017-01-14 16:14:02 +04:00
|
|
|
let info { conn } = conn.info
|
2018-06-05 13:41:23 +04:00
|
|
|
let local_metadata { conn } = conn.info.local_metadata
|
2018-06-05 03:45:43 +04:00
|
|
|
let remote_metadata { conn } = conn.info.remote_metadata
|
|
|
|
let private_node { conn } = conn.info.private_node
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
let accept
|
|
|
|
?incoming_message_queue_size ?outgoing_message_queue_size
|
2018-05-15 15:16:08 +04:00
|
|
|
?binary_chunks_size
|
2018-06-05 03:45:43 +04:00
|
|
|
({ fd ; info ; cryptobox_data } : 'meta authenticated_fd)
|
2018-05-15 15:16:08 +04:00
|
|
|
encoding =
|
2018-02-08 13:51:01 +04:00
|
|
|
protect begin fun () ->
|
2018-06-05 03:45:43 +04:00
|
|
|
Ack.write fd cryptobox_data Ack >>=? fun () ->
|
|
|
|
Ack.read fd cryptobox_data
|
2017-01-23 12:18:56 +04:00
|
|
|
end ~on_error:begin fun err ->
|
2017-01-14 16:14:02 +04:00
|
|
|
P2p_io_scheduler.close fd >>= fun _ ->
|
2017-01-23 12:18:56 +04:00
|
|
|
match err with
|
2018-02-27 19:33:50 +04:00
|
|
|
| [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection
|
|
|
|
| [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth
|
2017-01-23 12:18:56 +04:00
|
|
|
| err -> Lwt.return (Error err)
|
2018-05-15 15:16:08 +04:00
|
|
|
end >>=? function
|
2018-06-05 03:45:43 +04:00
|
|
|
| Ack ->
|
2018-05-15 15:16:08 +04:00
|
|
|
let canceler = Lwt_canceler.create () in
|
|
|
|
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
|
|
|
|
let reader =
|
|
|
|
Reader.run ?size:incoming_message_queue_size conn encoding canceler
|
|
|
|
and writer =
|
|
|
|
Writer.run
|
|
|
|
?size:outgoing_message_queue_size ?binary_chunks_size
|
|
|
|
conn encoding canceler
|
|
|
|
in
|
2018-06-05 03:45:43 +04:00
|
|
|
let conn = { conn ; reader ; writer } in
|
2018-05-15 15:16:08 +04:00
|
|
|
Lwt_canceler.on_cancel canceler begin fun () ->
|
|
|
|
P2p_io_scheduler.close fd >>= fun _ ->
|
|
|
|
Lwt.return_unit
|
|
|
|
end ;
|
2018-05-28 22:38:08 +04:00
|
|
|
return conn
|
2018-05-15 15:16:08 +04:00
|
|
|
| Nack ->
|
|
|
|
fail P2p_errors.Rejected_socket_connection
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
let catch_closed_pipe f =
|
|
|
|
Lwt.catch f begin function
|
2018-02-27 19:33:50 +04:00
|
|
|
| Lwt_pipe.Closed -> fail P2p_errors.Connection_closed
|
2017-01-14 16:14:02 +04:00
|
|
|
| exn -> fail (Exn exn)
|
2018-01-27 18:10:04 +04:00
|
|
|
end >>= function
|
|
|
|
| Error [Exn Lwt_pipe.Closed] ->
|
2018-02-27 19:33:50 +04:00
|
|
|
fail P2p_errors.Connection_closed
|
2018-01-27 18:10:04 +04:00
|
|
|
| Error _ | Ok _ as v -> Lwt.return v
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2017-11-11 06:34:12 +04:00
|
|
|
let pp_json encoding ppf msg =
|
2018-02-08 13:51:01 +04:00
|
|
|
Data_encoding.Json.pp ppf
|
|
|
|
(Data_encoding.Json.construct encoding msg)
|
2017-11-11 06:34:12 +04:00
|
|
|
|
|
|
|
let write { writer ; conn } msg =
|
2017-01-14 16:14:02 +04:00
|
|
|
catch_closed_pipe begin fun () ->
|
2017-11-11 06:34:12 +04:00
|
|
|
debug "Sending message to %a: %a"
|
2018-01-24 15:48:25 +04:00
|
|
|
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
|
2017-04-09 21:05:56 +04:00
|
|
|
Lwt.return (Writer.encode_message writer msg) >>=? fun buf ->
|
|
|
|
Lwt_pipe.push writer.messages (buf, None) >>= return
|
2017-01-14 16:14:02 +04:00
|
|
|
end
|
2017-11-11 06:34:12 +04:00
|
|
|
|
|
|
|
let write_sync { writer ; conn } msg =
|
2017-01-14 16:14:02 +04:00
|
|
|
catch_closed_pipe begin fun () ->
|
|
|
|
let waiter, wakener = Lwt.wait () in
|
2017-11-11 06:34:12 +04:00
|
|
|
debug "Sending message to %a: %a"
|
2018-01-24 15:48:25 +04:00
|
|
|
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
|
2017-04-09 21:05:56 +04:00
|
|
|
Lwt.return (Writer.encode_message writer msg) >>=? fun buf ->
|
|
|
|
Lwt_pipe.push writer.messages (buf, Some wakener) >>= fun () ->
|
2017-01-14 16:14:02 +04:00
|
|
|
waiter
|
|
|
|
end
|
2017-11-11 06:34:12 +04:00
|
|
|
|
|
|
|
let write_now { writer ; conn } msg =
|
|
|
|
debug "Try sending message to %a: %a"
|
2018-01-24 15:48:25 +04:00
|
|
|
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
|
2017-04-09 21:05:56 +04:00
|
|
|
Writer.encode_message writer msg >>? fun buf ->
|
|
|
|
try Ok (Lwt_pipe.push_now writer.messages (buf, None))
|
2018-02-27 19:33:50 +04:00
|
|
|
with Lwt_pipe.Closed -> Error [P2p_errors.Connection_closed]
|
2017-01-14 16:14:02 +04:00
|
|
|
|
2017-04-18 20:32:31 +04:00
|
|
|
let rec split_bytes size bytes =
|
|
|
|
if MBytes.length bytes <= size then
|
|
|
|
[bytes]
|
|
|
|
else
|
|
|
|
MBytes.sub bytes 0 size ::
|
|
|
|
split_bytes size (MBytes.sub bytes size (MBytes.length bytes - size))
|
|
|
|
|
2017-04-09 21:05:56 +04:00
|
|
|
let raw_write_sync { writer } bytes =
|
2017-04-18 20:32:31 +04:00
|
|
|
let bytes = split_bytes writer.binary_chunks_size bytes in
|
2017-04-09 21:05:56 +04:00
|
|
|
catch_closed_pipe begin fun () ->
|
|
|
|
let waiter, wakener = Lwt.wait () in
|
|
|
|
Lwt_pipe.push writer.messages (bytes, Some wakener) >>= fun () ->
|
|
|
|
waiter
|
|
|
|
end
|
|
|
|
|
2017-01-14 16:14:02 +04:00
|
|
|
let is_readable { reader } =
|
|
|
|
not (Lwt_pipe.is_empty reader.messages)
|
|
|
|
let wait_readable { reader } =
|
|
|
|
catch_closed_pipe begin fun () ->
|
|
|
|
Lwt_pipe.values_available reader.messages >>= return
|
|
|
|
end
|
|
|
|
let read { reader } =
|
|
|
|
catch_closed_pipe begin fun () ->
|
|
|
|
Lwt_pipe.pop reader.messages
|
|
|
|
end
|
|
|
|
let read_now { reader } =
|
|
|
|
try Lwt_pipe.pop_now reader.messages
|
2018-02-27 19:33:50 +04:00
|
|
|
with Lwt_pipe.Closed -> Some (Error [P2p_errors.Connection_closed])
|
2017-01-14 16:14:02 +04:00
|
|
|
|
|
|
|
let stat { conn = { fd } } = P2p_io_scheduler.stat fd
|
|
|
|
|
|
|
|
let close ?(wait = false) st =
|
|
|
|
begin
|
|
|
|
if not wait then Lwt.return_unit
|
|
|
|
else begin
|
|
|
|
Lwt_pipe.close st.reader.messages ;
|
|
|
|
Lwt_pipe.close st.writer.messages ;
|
|
|
|
st.writer.worker
|
|
|
|
end
|
|
|
|
end >>= fun () ->
|
|
|
|
Reader.shutdown st.reader >>= fun () ->
|
|
|
|
Writer.shutdown st.writer >>= fun () ->
|
|
|
|
P2p_io_scheduler.close st.conn.fd >>= fun _ ->
|
|
|
|
Lwt.return_unit
|