ligo/src/lib_p2p/p2p_socket.ml

561 lines
20 KiB
OCaml
Raw Normal View History

2017-01-14 16:14:02 +04:00
(**************************************************************************)
(* *)
2018-02-06 00:17:03 +04:00
(* Copyright (c) 2014 - 2018. *)
2017-01-14 16:14:02 +04:00
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(* TODO patch Data_encoding for continuation-based binary writer/reader. *)
(* TODO test `close ~wait:true`. *)
(* TODO nothing in welcoming message proves that the incoming peer is
the owner of the public key... only the first message will
really proves it. Should this to be changed? Not really
important, but... an attacker might forge a random public key
with enough proof of work (hard task), open a connection, wait
infinitly. This would avoid the real peer to talk with us. And
this might also have an influence on its "score". *)
include Logging.Make(struct let name = "p2p.connection" end)
2017-01-20 18:25:12 +04:00
module Crypto = struct
let bufsize = 1 lsl 16 - 1
2017-01-20 18:25:12 +04:00
let header_length = 2
let max_content_length = bufsize - header_length - Crypto_box.boxzerobytes
2017-01-20 18:25:12 +04:00
type data = {
channel_key : Crypto_box.channel_key ;
mutable local_nonce : Crypto_box.nonce ;
mutable remote_nonce : Crypto_box.nonce ;
}
let write_chunk fd cryptobox_data msg =
let msglen = MBytes.length msg in
fail_unless
(msglen <= max_content_length) P2p_errors.Invalid_message_size >>=? fun () ->
let buf = MBytes.make (msglen + Crypto_box.zerobytes) '\x00' in
MBytes.blit msg 0 buf Crypto_box.zerobytes msglen ;
2017-01-20 18:25:12 +04:00
let local_nonce = cryptobox_data.local_nonce in
cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ;
Crypto_box.fast_box_noalloc
cryptobox_data.channel_key local_nonce buf ;
let encrypted_length = msglen + Crypto_box.boxzerobytes in
MBytes.set_int16 buf
(Crypto_box.boxzerobytes - header_length) encrypted_length ;
let payload = MBytes.sub buf (Crypto_box.boxzerobytes - header_length)
(header_length + encrypted_length) in
P2p_io_scheduler.write fd payload
2017-01-20 18:25:12 +04:00
let read_chunk fd cryptobox_data =
let header_buf = MBytes.create header_length in
P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () ->
let encrypted_length = MBytes.get_uint16 header_buf 0 in
let buf = MBytes.make (encrypted_length + Crypto_box.boxzerobytes) '\x00' in
P2p_io_scheduler.read_full
~pos:Crypto_box.boxzerobytes ~len:encrypted_length fd buf >>=? fun () ->
2017-01-20 18:25:12 +04:00
let remote_nonce = cryptobox_data.remote_nonce in
cryptobox_data.remote_nonce <- Crypto_box.increment_nonce remote_nonce ;
match
Crypto_box.fast_box_open_noalloc
cryptobox_data.channel_key remote_nonce buf
2017-01-20 18:25:12 +04:00
with
| false ->
fail P2p_errors.Decipher_error
| true ->
return (MBytes.sub buf Crypto_box.zerobytes
(encrypted_length - Crypto_box.boxzerobytes))
2017-01-14 16:14:02 +04:00
2017-01-20 18:25:12 +04:00
end
2017-01-14 16:14:02 +04:00
2017-04-18 20:32:31 +04:00
let check_binary_chunks_size size =
let value = size - Crypto_box.boxzerobytes - Crypto.header_length in
2017-04-18 20:32:31 +04:00
fail_unless
(value > 0 &&
value <= Crypto.max_content_length)
(P2p_errors.Invalid_chunks_size
2017-04-18 20:32:31 +04:00
{ value = size ;
min = Crypto.(header_length + Crypto_box.boxzerobytes + 1) ;
max = Crypto.bufsize ;
2017-04-18 20:32:31 +04:00
})
2017-01-14 16:14:02 +04:00
module Connection_message = struct
type t = {
port : int option ;
versions : P2p_version.t list ;
2017-01-14 16:14:02 +04:00
public_key : Crypto_box.public_key ;
proof_of_work_stamp : Crypto_box.nonce ;
message_nonce : Crypto_box.nonce ;
}
let encoding =
let open Data_encoding in
conv
(fun { port ; public_key ; proof_of_work_stamp ;
message_nonce ; versions } ->
let port = match port with None -> 0 | Some port -> port in
(port, public_key, proof_of_work_stamp,
message_nonce, versions))
(fun (port, public_key, proof_of_work_stamp,
message_nonce, versions) ->
let port = if port = 0 then None else Some port in
{ port ; public_key ; proof_of_work_stamp ;
message_nonce ; versions })
(obj5
(req "port" uint16)
(req "pubkey" Crypto_box.public_key_encoding)
(req "proof_of_work_stamp" Crypto_box.nonce_encoding)
(req "message_nonce" Crypto_box.nonce_encoding)
(req "versions" (Variable.list P2p_version.encoding)))
2017-01-14 16:14:02 +04:00
let write fd message =
let encoded_message_len =
Data_encoding.Binary.length encoding message in
fail_unless
(encoded_message_len < 1 lsl (Crypto.header_length * 8))
P2p_errors.Encoding_error >>=? fun () ->
2017-01-20 18:25:12 +04:00
let len = Crypto.header_length + encoded_message_len in
2017-01-14 16:14:02 +04:00
let buf = MBytes.create len in
2017-01-20 18:25:12 +04:00
match Data_encoding.Binary.write
encoding message buf Crypto.header_length len with
2017-01-14 16:14:02 +04:00
| None ->
fail P2p_errors.Encoding_error
2017-01-14 16:14:02 +04:00
| Some last ->
fail_unless (last = len) P2p_errors.Encoding_error >>=? fun () ->
2017-01-14 16:14:02 +04:00
MBytes.set_int16 buf 0 encoded_message_len ;
P2p_io_scheduler.write fd buf >>=? fun () ->
(* We return the raw message as it is used later to compute
the nonces *)
return buf
2017-01-14 16:14:02 +04:00
let read fd =
2017-01-20 18:25:12 +04:00
let header_buf = MBytes.create Crypto.header_length in
P2p_io_scheduler.read_full
~len:Crypto.header_length fd header_buf >>=? fun () ->
2017-01-14 16:14:02 +04:00
let len = MBytes.get_uint16 header_buf 0 in
let pos = Crypto.header_length in
let buf = MBytes.create (pos + len) in
MBytes.set_int16 buf 0 len ;
P2p_io_scheduler.read_full ~len ~pos fd buf >>=? fun () ->
match Data_encoding.Binary.read encoding buf pos len with
2017-01-14 16:14:02 +04:00
| None ->
fail P2p_errors.Decoding_error
| Some (next_pos, message) ->
if next_pos <> pos+len then
fail P2p_errors.Decoding_error
2017-01-14 16:14:02 +04:00
else
return (message, buf)
2017-01-14 16:14:02 +04:00
end
module Ack = struct
2017-01-20 18:25:12 +04:00
type t = Ack | Nack
2017-01-14 16:14:02 +04:00
let ack = MBytes.of_string "\255"
let nack = MBytes.of_string "\000"
2017-01-20 18:25:12 +04:00
let write cryptobox_data fd b =
Crypto.write_chunk cryptobox_data fd
(match b with Ack -> ack | Nack -> nack)
2017-01-14 16:14:02 +04:00
2017-01-20 18:25:12 +04:00
let read fd cryptobox_data =
Crypto.read_chunk fd cryptobox_data >>=? fun buf ->
2017-01-14 16:14:02 +04:00
return (buf <> nack)
end
type authenticated_fd =
P2p_io_scheduler.connection * P2p_connection.Info.t * Crypto.data
2017-01-14 16:14:02 +04:00
2017-01-20 18:25:12 +04:00
let kick (fd, _ , cryptobox_data) =
Ack.write fd cryptobox_data Nack >>= fun _ ->
2017-01-14 16:14:02 +04:00
P2p_io_scheduler.close fd >>= fun _ ->
Lwt.return_unit
(* First step: write and read credentials, makes no difference
whether we're trying to connect to a peer or checking an incoming
connection, both parties must first introduce themselves. *)
let authenticate
~proof_of_work_target
~incoming fd (remote_addr, remote_socket_port as point)
?listening_port identity supported_versions =
let local_nonce_seed = Crypto_box.random_nonce () in
lwt_debug "Sending authenfication to %a" P2p_point.Id.pp point >>= fun () ->
2017-01-14 16:14:02 +04:00
Connection_message.write fd
{ public_key = identity.P2p_identity.public_key ;
2017-01-14 16:14:02 +04:00
proof_of_work_stamp = identity.proof_of_work_stamp ;
message_nonce = local_nonce_seed ;
2017-01-14 16:14:02 +04:00
port = listening_port ;
versions = supported_versions } >>=? fun sent_msg ->
Connection_message.read fd >>=? fun (msg, recv_msg) ->
2017-01-14 16:14:02 +04:00
let remote_listening_port =
if incoming then msg.port else Some remote_socket_port in
let id_point = remote_addr, remote_listening_port in
2017-02-24 06:50:33 +04:00
let remote_peer_id = Crypto_box.hash msg.public_key in
2017-01-14 16:14:02 +04:00
fail_unless
(remote_peer_id <> identity.P2p_identity.peer_id)
(P2p_errors.Myself id_point) >>=? fun () ->
2017-01-14 16:14:02 +04:00
fail_unless
(Crypto_box.check_proof_of_work
msg.public_key msg.proof_of_work_stamp proof_of_work_target)
(P2p_errors.Not_enough_proof_of_work remote_peer_id) >>=? fun () ->
2017-01-14 16:14:02 +04:00
let channel_key =
Crypto_box.precompute identity.P2p_identity.secret_key msg.public_key in
let (local_nonce, remote_nonce) =
Crypto_box.generate_nonces ~incoming ~sent_msg ~recv_msg in
let cryptobox_data = { Crypto.channel_key ; local_nonce ; remote_nonce } in
2017-01-14 16:14:02 +04:00
let info =
{ P2p_connection.Info.peer_id = remote_peer_id ;
2017-02-24 06:50:33 +04:00
versions = msg.versions ; incoming ;
2017-01-14 16:14:02 +04:00
id_point ; remote_socket_port ;} in
return (info, (fd, info, cryptobox_data))
type connection = {
2017-03-14 13:51:44 +04:00
id : int ;
info : P2p_connection.Info.t ;
2017-01-14 16:14:02 +04:00
fd : P2p_io_scheduler.connection ;
2017-01-20 18:25:12 +04:00
cryptobox_data : Crypto.data ;
2017-01-14 16:14:02 +04:00
}
2017-03-14 13:51:44 +04:00
let next_conn_id =
let cpt = ref 0 in
fun () -> incr cpt ;!cpt
2017-01-14 16:14:02 +04:00
module Reader = struct
type 'msg t = {
canceler: Lwt_canceler.t ;
2017-01-14 16:14:02 +04:00
conn: connection ;
encoding: 'msg Data_encoding.t ;
messages: (int * 'msg) tzresult Lwt_pipe.t ;
2017-01-14 16:14:02 +04:00
mutable worker: unit Lwt.t ;
}
let read_message st init =
2017-04-18 20:32:31 +04:00
let rec loop status =
Lwt_unix.yield () >>= fun () ->
let open Data_encoding.Binary in
match status with
| Success { result ; size ; stream } ->
return (Some (result, size, stream))
| Error _ ->
2017-04-18 20:32:31 +04:00
lwt_debug "[read_message] incremental decoding error" >>= fun () ->
return None
| Await decode_next_buf ->
protect ~canceler:st.canceler begin fun () ->
2017-04-18 20:32:31 +04:00
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data
end >>=? fun buf ->
lwt_debug
"reading %d bytes from %a"
(MBytes.length buf) P2p_connection.Info.pp st.conn.info >>= fun () ->
2017-04-18 20:32:31 +04:00
loop (decode_next_buf buf) in
loop (Data_encoding.Binary.read_stream ?init st.encoding)
2017-04-18 20:32:31 +04:00
let rec worker_loop st stream =
2017-04-18 20:32:31 +04:00
begin
read_message st stream >>=? fun msg ->
match msg with
| None ->
protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.push st.messages (Error [P2p_errors.Decoding_error]) >>= fun () ->
return None
end
| Some (msg, size, stream) ->
protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
return (Some stream)
end
2017-01-14 16:14:02 +04:00
end >>= function
| Ok (Some stream) ->
worker_loop st (Some stream)
2017-04-18 20:32:31 +04:00
| Ok None ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Error [Canceled | Exn Lwt_pipe.Closed] ->
2017-04-18 20:32:31 +04:00
lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () ->
2017-01-14 16:14:02 +04:00
Lwt.return_unit
| Error _ as err ->
Lwt_pipe.safe_push_now st.messages err ;
Lwt_canceler.cancel st.canceler >>= fun () ->
2017-01-14 16:14:02 +04:00
Lwt.return_unit
let run ?size conn encoding canceler =
2017-01-24 02:59:16 +04:00
let compute_size = function
| Ok (size, _) -> (Sys.word_size / 8) * 11 + size + Lwt_pipe.push_overhead
2017-01-24 02:59:16 +04:00
| Error _ -> 0 (* we push Error only when we close the socket,
we don't fear memory leaks in that case... *) in
let size = Option.map size ~f:(fun max -> (max, compute_size)) in
2017-01-14 16:14:02 +04:00
let st =
{ canceler ; conn ; encoding ;
messages = Lwt_pipe.create ?size () ;
worker = Lwt.return_unit ;
} in
Lwt_canceler.on_cancel st.canceler begin fun () ->
2017-01-14 16:14:02 +04:00
Lwt_pipe.close st.messages ;
Lwt.return_unit
end ;
st.worker <-
Lwt_utils.worker "reader"
~run:(fun () -> worker_loop st None)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
2017-01-14 16:14:02 +04:00
st
let shutdown st =
Lwt_canceler.cancel st.canceler >>= fun () ->
2017-01-14 16:14:02 +04:00
st.worker
end
module Writer = struct
type 'msg t = {
canceler: Lwt_canceler.t ;
2017-01-14 16:14:02 +04:00
conn: connection ;
encoding: 'msg Data_encoding.t ;
2017-04-18 20:32:31 +04:00
messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ;
2017-01-14 16:14:02 +04:00
mutable worker: unit Lwt.t ;
2017-04-18 20:32:31 +04:00
binary_chunks_size: int ; (* in bytes *)
2017-01-14 16:14:02 +04:00
}
2017-11-13 17:29:28 +04:00
let send_message st buf =
2017-04-18 20:32:31 +04:00
let rec loop = function
| [] -> return ()
| buf :: l ->
protect ~canceler:st.canceler begin fun () ->
2017-04-18 20:32:31 +04:00
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
end >>=? fun () ->
lwt_debug "writing %d bytes to %a"
(MBytes.length buf) P2p_connection.Info.pp st.conn.info >>= fun () ->
2017-04-18 20:32:31 +04:00
loop l in
loop buf
2017-01-14 16:14:02 +04:00
let encode_message st msg =
try ok (MBytes.cut
st.binary_chunks_size
(Data_encoding.Binary.to_bytes st.encoding msg))
with _ -> error P2p_errors.Encoding_error
2017-01-14 16:14:02 +04:00
let rec worker_loop st =
Lwt_unix.yield () >>= fun () ->
protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.pop st.messages >>= return
2017-01-14 16:14:02 +04:00
end >>= function
| Error [Canceled | Exn Lwt_pipe.Closed] ->
2017-02-13 16:37:57 +04:00
lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () ->
2017-01-14 16:14:02 +04:00
Lwt.return_unit
| Error err ->
lwt_log_error
2017-02-13 16:37:57 +04:00
"@[<v 2>error writing to %a@ %a@]"
P2p_connection.Info.pp st.conn.info pp_print_error err >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
2017-01-14 16:14:02 +04:00
Lwt.return_unit
| Ok (buf, wakener) ->
2017-04-18 20:32:31 +04:00
send_message st buf >>= fun res ->
match res with
| Ok () ->
Option.iter wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
worker_loop st
| Error err ->
Option.iter wakener
~f:(fun u ->
Lwt.wakeup_later u
(Error [P2p_errors.Connection_closed])) ;
match err with
| [ Canceled | Exn Lwt_pipe.Closed ] ->
lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt.return_unit
| [ P2p_errors.Connection_closed ] ->
lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| err ->
lwt_log_error
"@[<v 2>error writing to %a@ %a@]"
P2p_connection.Info.pp st.conn.info
pp_print_error err >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
2017-01-14 16:14:02 +04:00
2017-04-18 20:32:31 +04:00
let run
?size ?binary_chunks_size
conn encoding canceler =
let binary_chunks_size =
match binary_chunks_size with
| None -> Crypto.max_content_length
| Some size ->
let size = size - Crypto_box.boxzerobytes - Crypto.header_length in
2017-04-18 20:32:31 +04:00
assert (size > 0) ;
assert (size <= Crypto.max_content_length) ;
size
in
let compute_size =
let buf_list_size =
List.fold_left
(fun sz buf ->
sz + MBytes.length buf + 2 * Sys.word_size) 0
in
function
| buf_l, None ->
Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead
| buf_l, Some _ ->
2 * Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead
2017-01-24 02:59:16 +04:00
in
let size = Option.map size ~f:(fun max -> max, compute_size) in
2017-01-14 16:14:02 +04:00
let st =
{ canceler ; conn ; encoding ;
messages = Lwt_pipe.create ?size () ;
worker = Lwt.return_unit ;
2017-04-18 20:32:31 +04:00
binary_chunks_size = binary_chunks_size ;
2017-01-14 16:14:02 +04:00
} in
Lwt_canceler.on_cancel st.canceler begin fun () ->
2017-01-14 16:14:02 +04:00
Lwt_pipe.close st.messages ;
while not (Lwt_pipe.is_empty st.messages) do
let _, w = Lwt_pipe.pop_now_exn st.messages in
Option.iter w
~f:(fun u -> Lwt.wakeup_later u (Error [Exn Lwt_pipe.Closed]))
done ;
2017-01-14 16:14:02 +04:00
Lwt.return_unit
end ;
st.worker <-
Lwt_utils.worker "writer"
2017-11-13 17:29:28 +04:00
~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
2017-01-14 16:14:02 +04:00
st
let shutdown st =
Lwt_canceler.cancel st.canceler >>= fun () ->
2017-01-14 16:14:02 +04:00
st.worker
end
type 'msg t = {
conn : connection ;
reader : 'msg Reader.t ;
writer : 'msg Writer.t ;
}
2017-03-14 13:51:44 +04:00
let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
let pp ppf { conn } = P2p_connection.Info.pp ppf conn.info
2017-01-14 16:14:02 +04:00
let info { conn } = conn.info
let accept
?incoming_message_queue_size ?outgoing_message_queue_size
2017-04-18 20:32:31 +04:00
?binary_chunks_size (fd, info, cryptobox_data) encoding =
protect begin fun () ->
2017-01-20 18:25:12 +04:00
Ack.write fd cryptobox_data Ack >>=? fun () ->
Ack.read fd cryptobox_data
end ~on_error:begin fun err ->
2017-01-14 16:14:02 +04:00
P2p_io_scheduler.close fd >>= fun _ ->
match err with
| [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection
| [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth
| err -> Lwt.return (Error err)
2017-01-14 16:14:02 +04:00
end >>=? fun accepted ->
fail_unless accepted P2p_errors.Rejected_socket_connection >>=? fun () ->
let canceler = Lwt_canceler.create () in
2017-04-18 20:32:31 +04:00
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
2017-01-14 16:14:02 +04:00
let reader =
2017-01-24 02:59:16 +04:00
Reader.run ?size:incoming_message_queue_size conn encoding canceler
2017-01-14 16:14:02 +04:00
and writer =
2017-04-18 20:32:31 +04:00
Writer.run
?size:outgoing_message_queue_size ?binary_chunks_size
conn encoding canceler
in
2017-01-14 16:14:02 +04:00
let conn = { conn ; reader ; writer } in
Lwt_canceler.on_cancel canceler begin fun () ->
2017-01-14 16:14:02 +04:00
P2p_io_scheduler.close fd >>= fun _ ->
Lwt.return_unit
end ;
return conn
let catch_closed_pipe f =
Lwt.catch f begin function
| Lwt_pipe.Closed -> fail P2p_errors.Connection_closed
2017-01-14 16:14:02 +04:00
| exn -> fail (Exn exn)
2018-01-27 18:10:04 +04:00
end >>= function
| Error [Exn Lwt_pipe.Closed] ->
fail P2p_errors.Connection_closed
2018-01-27 18:10:04 +04:00
| Error _ | Ok _ as v -> Lwt.return v
2017-01-14 16:14:02 +04:00
2017-11-11 06:34:12 +04:00
let pp_json encoding ppf msg =
Data_encoding.Json.pp ppf
(Data_encoding.Json.construct encoding msg)
2017-11-11 06:34:12 +04:00
let write { writer ; conn } msg =
2017-01-14 16:14:02 +04:00
catch_closed_pipe begin fun () ->
2017-11-11 06:34:12 +04:00
debug "Sending message to %a: %a"
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
Lwt.return (Writer.encode_message writer msg) >>=? fun buf ->
Lwt_pipe.push writer.messages (buf, None) >>= return
2017-01-14 16:14:02 +04:00
end
2017-11-11 06:34:12 +04:00
let write_sync { writer ; conn } msg =
2017-01-14 16:14:02 +04:00
catch_closed_pipe begin fun () ->
let waiter, wakener = Lwt.wait () in
2017-11-11 06:34:12 +04:00
debug "Sending message to %a: %a"
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
Lwt.return (Writer.encode_message writer msg) >>=? fun buf ->
Lwt_pipe.push writer.messages (buf, Some wakener) >>= fun () ->
2017-01-14 16:14:02 +04:00
waiter
end
2017-11-11 06:34:12 +04:00
let write_now { writer ; conn } msg =
debug "Try sending message to %a: %a"
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
Writer.encode_message writer msg >>? fun buf ->
try Ok (Lwt_pipe.push_now writer.messages (buf, None))
with Lwt_pipe.Closed -> Error [P2p_errors.Connection_closed]
2017-01-14 16:14:02 +04:00
2017-04-18 20:32:31 +04:00
let rec split_bytes size bytes =
if MBytes.length bytes <= size then
[bytes]
else
MBytes.sub bytes 0 size ::
split_bytes size (MBytes.sub bytes size (MBytes.length bytes - size))
let raw_write_sync { writer } bytes =
2017-04-18 20:32:31 +04:00
let bytes = split_bytes writer.binary_chunks_size bytes in
catch_closed_pipe begin fun () ->
let waiter, wakener = Lwt.wait () in
Lwt_pipe.push writer.messages (bytes, Some wakener) >>= fun () ->
waiter
end
2017-01-14 16:14:02 +04:00
let is_readable { reader } =
not (Lwt_pipe.is_empty reader.messages)
let wait_readable { reader } =
catch_closed_pipe begin fun () ->
Lwt_pipe.values_available reader.messages >>= return
end
let read { reader } =
catch_closed_pipe begin fun () ->
Lwt_pipe.pop reader.messages
end
let read_now { reader } =
try Lwt_pipe.pop_now reader.messages
with Lwt_pipe.Closed -> Some (Error [P2p_errors.Connection_closed])
2017-01-14 16:14:02 +04:00
let stat { conn = { fd } } = P2p_io_scheduler.stat fd
let close ?(wait = false) st =
begin
if not wait then Lwt.return_unit
else begin
Lwt_pipe.close st.reader.messages ;
Lwt_pipe.close st.writer.messages ;
st.writer.worker
end
end >>= fun () ->
Reader.shutdown st.reader >>= fun () ->
Writer.shutdown st.writer >>= fun () ->
P2p_io_scheduler.close st.conn.fd >>= fun _ ->
Lwt.return_unit