P2p: use incremental encoding/decoding

This commit is contained in:
OCamlPro-Iguernlala 2017-04-18 18:32:31 +02:00 committed by Grégoire Henry
parent 32fa712e13
commit fbacfff9e6
12 changed files with 192 additions and 46 deletions

View File

@ -73,6 +73,7 @@ let default_net_limits : P2p.limits = {
max_known_points = Some (400, 300) ;
max_known_peer_ids = Some (400, 300) ;
swap_linger = 30. ;
binary_chunks_size = None ;
}
let default_net = {
@ -116,11 +117,12 @@ let limit : P2p.limits Data_encoding.t =
incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_peer_ids_history_size ;
max_known_points ; max_known_peer_ids ;
swap_linger ;
swap_linger ; binary_chunks_size
} ->
( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed, swap_linger) ,
max_download_speed, max_upload_speed, swap_linger,
binary_chunks_size) ,
( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size,
@ -129,7 +131,8 @@ let limit : P2p.limits Data_encoding.t =
)))
(fun ( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed, swap_linger) ,
max_download_speed, max_upload_speed, swap_linger,
binary_chunks_size) ,
( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size,
@ -143,9 +146,11 @@ let limit : P2p.limits Data_encoding.t =
incoming_app_message_queue_size ;
incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_peer_ids_history_size ;
max_known_points ; max_known_peer_ids ; swap_linger })
max_known_points ; max_known_peer_ids ; swap_linger ;
binary_chunks_size
})
(merge_objs
(obj9
(obj10
(dft "authentification-timeout"
float default_net_limits.authentification_timeout)
(dft "min-connections" uint16
@ -160,7 +165,8 @@ let limit : P2p.limits Data_encoding.t =
default_net_limits.max_incoming_connections)
(opt "max-download-speed" int31)
(opt "max-upload-speed" int31)
(dft "swap-linger" float default_net_limits.swap_linger))
(dft "swap-linger" float default_net_limits.swap_linger)
(opt "binary-chunks-size" uint8))
(obj10
(dft "read-buffer-size" int31
default_net_limits.read_buffer_size)
@ -266,6 +272,7 @@ let update
?max_connections
?max_download_speed
?max_upload_speed
?binary_chunks_size
?peer_table_size
?expected_pow
?bootstrap_peers
@ -308,6 +315,8 @@ let update
max_known_peer_ids =
Utils.first_some
peer_table_size cfg.net.limits.max_known_peer_ids ;
binary_chunks_size =
Utils.map_option (fun x -> x lsl 10) binary_chunks_size ;
} in
let net : net = {
expected_pow =

View File

@ -56,6 +56,7 @@ val update:
?max_connections:int ->
?max_download_speed:int ->
?max_upload_speed:int ->
?binary_chunks_size:int->
?peer_table_size:int ->
?expected_pow:float ->
?bootstrap_peers:string list ->

View File

@ -20,6 +20,7 @@ type t = {
max_connections: int option ;
max_download_speed: int option ;
max_upload_speed: int option ;
binary_chunks_size: int option ;
peer_table_size: int option ;
expected_pow: float option ;
peers: string list ;
@ -35,7 +36,7 @@ type t = {
let wrap
data_dir config_file
connections max_download_speed max_upload_speed
connections max_download_speed max_upload_speed binary_chunks_size
peer_table_size
listen_addr peers no_bootstrap_peers closed expected_pow
rpc_listen_addr rpc_tls
@ -66,6 +67,7 @@ let wrap
max_connections ;
max_download_speed ;
max_upload_speed ;
binary_chunks_size ;
expected_pow ;
peers ;
no_bootstrap_peers ;
@ -150,6 +152,14 @@ module Term = struct
Arg.(value & opt (some int) None &
info ~docs ~doc ~docv:"NUM" ["max-upload-speed"])
let binary_chunks_size =
let doc =
Format.sprintf
"Size limit (in kB) of binary blocks that are sent to other peers."
in
Arg.(value & opt (some int) None &
info ~docs ~doc ~docv:"NUM" ["binary-chunks-size"])
let peer_table_size =
let doc = "Maximum size of internal peer tables, \
used to store metadata/logs about a peer or about a \
@ -223,7 +233,7 @@ module Term = struct
let open Term in
const wrap $ data_dir $ config_file
$ connections
$ max_download_speed $ max_upload_speed
$ max_download_speed $ max_upload_speed $ binary_chunks_size
$ peer_table_size
$ listen_addr $ peers $ no_bootstrap_peers $ closed $ expected_pow
$ rpc_listen_addr $ rpc_tls
@ -241,7 +251,7 @@ let read_and_patch_config_file args =
end >>=? fun cfg ->
let { data_dir ;
min_connections ; expected_connections ; max_connections ;
max_download_speed ; max_upload_speed ;
max_download_speed ; max_upload_speed ; binary_chunks_size ;
peer_table_size ;
expected_pow ;
peers ; no_bootstrap_peers ;
@ -257,6 +267,7 @@ let read_and_patch_config_file args =
return @@
Node_config_file.update
?data_dir ?min_connections ?expected_connections ?max_connections
?max_download_speed ?max_upload_speed ?peer_table_size ?expected_pow
?max_download_speed ?max_upload_speed ?binary_chunks_size
?peer_table_size ?expected_pow
~bootstrap_peers ?listen_addr ?rpc_listen_addr
~closed ~cors_origins ~cors_headers ?rpc_tls ?log_output cfg

View File

@ -17,6 +17,7 @@ type t = {
max_connections: int option ;
max_download_speed: int option ;
max_upload_speed: int option ;
binary_chunks_size: int option ;
peer_table_size: int option ;
expected_pow: float option ;
peers: string list ;

View File

@ -69,6 +69,7 @@ type limits = {
swap_linger : float ;
binary_chunks_size : int option ;
}
let create_scheduler limits =
@ -104,6 +105,7 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched =
max_known_points = limits.max_known_points ;
max_known_peer_ids = limits.max_known_peer_ids ;
swap_linger = limits.swap_linger ;
binary_chunks_size = limits.binary_chunks_size ;
}
in
let pool =
@ -365,7 +367,13 @@ let check_limits =
fail_2 c.known_points_history_size
"known-points-history-size" >>=? fun () ->
fail_1 c.swap_linger
"swap-linger"
"swap-linger" >>=? fun () ->
begin
match c.binary_chunks_size with
| None -> return ()
| Some size -> P2p_connection.check_binary_chunks_size size
end >>=? fun () ->
return ()
let create ~config ~limits meta_cfg msg_cfg =
check_limits limits >>=? fun () ->

View File

@ -127,6 +127,10 @@ type limits = {
(** Peer swapping does not occur more than once during a timespan of
[swap_linger] seconds. *)
binary_chunks_size : int option ;
(** Size (in bytes) of binary blocks that are sent to other
peers. Default value is 64 kB. Max value is 64kB. *)
}
type ('msg, 'meta) t

View File

@ -31,13 +31,14 @@ type error += Decoding_error
type error += Myself of Id_point.t
type error += Not_enough_proof_of_work of Peer_id.t
type error += Invalid_auth
type error += Invalid_chunks_size of { value: int ; min: int ; max: int }
module Crypto = struct
let header_length = 2
let crypto_overhead = 18 (* FIXME import from Sodium.Box. *)
let max_content_length =
1 lsl (header_length * 8) - crypto_overhead
1 lsl (header_length * 8) - crypto_overhead - header_length
type data = {
channel_key : Crypto_box.channel_key ;
@ -78,6 +79,17 @@ module Crypto = struct
end
let check_binary_chunks_size size =
let value = size - Crypto.crypto_overhead - Crypto.header_length in
fail_unless
(value > 0 &&
value <= Crypto.max_content_length)
(Invalid_chunks_size
{ value = size ;
min = Crypto.(header_length + crypto_overhead + 1) ;
max = Crypto.(max_content_length + crypto_overhead + header_length)
})
module Connection_message = struct
type t = {
@ -226,28 +238,43 @@ module Reader = struct
mutable worker: unit Lwt.t ;
}
let rec read_message st buf =
return (Data_encoding.Binary.of_bytes st.encoding buf)
let rec worker_loop st =
let rec read_message st init_mbytes =
let rec loop status =
Lwt_unix.yield () >>= fun () ->
let open Data_encoding.Binary in
match status with
| Success { res ; res_len ; remaining } ->
return (Some (res, res_len, remaining))
| Error ->
lwt_debug "[read_message] incremental decoding error" >>= fun () ->
return None
| Await decode_next_buf ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf ->
let size = 6 * (Sys.word_size / 8) + MBytes.length buf in
lwt_debug "reading %d bytes from %a"
size Connection_info.pp st.conn.info >>= fun () ->
read_message st buf >>=? fun msg ->
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data
end >>=? fun buf ->
lwt_debug
"reading %d bytes from %a"
(MBytes.length buf) Connection_info.pp st.conn.info >>= fun () ->
loop (decode_next_buf buf) in
loop
(Data_encoding.Binary.read_stream_of_bytes ~init:init_mbytes st.encoding)
let rec worker_loop st init_mbytes =
Lwt_unix.yield () >>= fun () ->
begin
read_message st init_mbytes >>=? fun msg ->
match msg with
| None ->
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
return false
| Some msg ->
return None
| Some (msg, size, rem_mbytes) ->
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
return true
return (Some rem_mbytes)
end >>= function
| Ok true ->
worker_loop st
| Ok false ->
| Ok Some rem_mbytes ->
worker_loop st rem_mbytes
| Ok None ->
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
@ -276,7 +303,7 @@ module Reader = struct
end ;
st.worker <-
Lwt_utils.worker "reader"
(fun () -> worker_loop st)
(fun () -> worker_loop st [])
(fun () -> Canceler.cancel st.canceler) ;
st
@ -292,12 +319,25 @@ module Writer = struct
canceler: Canceler.t ;
conn: connection ;
encoding: 'msg Data_encoding.t ;
messages: (MBytes.t * unit tzresult Lwt.u option) Lwt_pipe.t ;
messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ;
mutable worker: unit Lwt.t ;
binary_chunks_size: int ; (* in bytes *)
}
let rec send_message st buf =
let rec loop = function
| [] -> return ()
| buf :: l ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
end >>=? fun () ->
lwt_debug "writing %d bytes to %a"
(MBytes.length buf) Connection_info.pp st.conn.info >>= fun () ->
loop l in
loop buf
let encode_message st msg =
try ok (Data_encoding.Binary.to_bytes st.encoding msg)
try ok (Data_encoding.Binary.to_bytes_list st.binary_chunks_size st.encoding msg)
with _ -> error Encoding_error
let rec worker_loop st =
@ -316,11 +356,7 @@ module Writer = struct
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Ok (buf, wakener) ->
lwt_debug "writing %d bytes to %a"
(MBytes.length buf) Connection_info.pp st.conn.info >>= fun () ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
end >>= fun res ->
send_message st buf >>= fun res ->
match res with
| Ok () ->
iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
@ -348,16 +384,34 @@ module Writer = struct
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
let run ?size conn encoding canceler =
let compute_size = function
| buf, None -> Sys.word_size + MBytes.length buf
| buf, Some _ -> 2 * Sys.word_size + MBytes.length buf
let run
?size ?binary_chunks_size
conn encoding canceler =
let binary_chunks_size =
match binary_chunks_size with
| None -> Crypto.max_content_length
| Some size ->
let size = size - Crypto.crypto_overhead - Crypto.header_length in
assert (size > 0) ;
assert (size <= Crypto.max_content_length) ;
size
in
let compute_size =
let buf_list_size =
List.fold_left
(fun sz buf ->
sz + MBytes.length buf + 2 * Sys.word_size) 0
in
function
| buf_l, None -> Sys.word_size + buf_list_size buf_l
| buf_l, Some _ -> 2 * Sys.word_size + buf_list_size buf_l
in
let size = map_option size ~f:(fun max -> max, compute_size) in
let st =
{ canceler ; conn ; encoding ;
messages = Lwt_pipe.create ?size () ;
worker = Lwt.return_unit ;
binary_chunks_size = binary_chunks_size ;
} in
Canceler.on_cancel st.canceler begin fun () ->
Lwt_pipe.close st.messages ;
@ -388,13 +442,12 @@ type 'msg t = {
let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
let pp ppf { conn } = Connection_info.pp ppf conn.info
let info { conn } = conn.info
let accept
?incoming_message_queue_size ?outgoing_message_queue_size
(fd, info, cryptobox_data) encoding =
?binary_chunks_size (fd, info, cryptobox_data) encoding =
Lwt_utils.protect begin fun () ->
Ack.write fd cryptobox_data Ack >>=? fun () ->
Ack.read fd cryptobox_data
@ -407,11 +460,14 @@ let accept
end >>=? fun accepted ->
fail_unless accepted Rejected >>=? fun () ->
let canceler = Canceler.create () in
let conn = { id = next_conn_id (); fd ; info ; cryptobox_data } in
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
let reader =
Reader.run ?size:incoming_message_queue_size conn encoding canceler
and writer =
Writer.run ?size:outgoing_message_queue_size conn encoding canceler in
Writer.run
?size:outgoing_message_queue_size ?binary_chunks_size
conn encoding canceler
in
let conn = { conn ; reader ; writer } in
Canceler.on_cancel canceler begin fun () ->
P2p_io_scheduler.close fd >>= fun _ ->
@ -445,7 +501,15 @@ let write_now { writer } msg =
try Ok (Lwt_pipe.push_now writer.messages (buf, None))
with Lwt_pipe.Closed -> Error [P2p_io_scheduler.Connection_closed]
let rec split_bytes size bytes =
if MBytes.length bytes <= size then
[bytes]
else
MBytes.sub bytes 0 size ::
split_bytes size (MBytes.sub bytes size (MBytes.length bytes - size))
let raw_write_sync { writer } bytes =
let bytes = split_bytes writer.binary_chunks_size bytes in
catch_closed_pipe begin fun () ->
let waiter, wakener = Lwt.wait () in
Lwt_pipe.push writer.messages (bytes, Some wakener) >>= fun () ->

View File

@ -27,6 +27,7 @@ type error += Rejected
type error += Myself of Id_point.t
type error += Not_enough_proof_of_work of Peer_id.t
type error += Invalid_auth
type error += Invalid_chunks_size of { value: int ; min: int ; max: int }
type authenticated_fd
(** Type of a connection that successfully passed the authentication
@ -64,11 +65,15 @@ val kick: authenticated_fd -> unit Lwt.t
val accept:
?incoming_message_queue_size:int ->
?outgoing_message_queue_size:int ->
?binary_chunks_size: int ->
authenticated_fd -> 'msg Data_encoding.t -> 'msg t tzresult Lwt.t
(** (Low-level) (Cancelable) Accepts a remote peer given an
authenticated_fd. Used in [P2p_connection_pool], to promote an
[authenticated_fd] to the status of an active peer. *)
val check_binary_chunks_size: int -> unit tzresult Lwt.t
(** Precheck for the [?binary_chunks_size] parameter of [accept]. *)
(** {1 IO functions on connections} *)
(** {2 Output functions} *)

View File

@ -40,6 +40,7 @@ module Message = struct
let encoding msg_encoding =
let open Data_encoding in
dynamic_size @@
union ~tag_size:`Uint16
([ case ~tag:0x01 null
(function Disconnect -> Some () | _ -> None)
@ -329,6 +330,8 @@ type config = {
max_known_peer_ids : (int * int) option ; (* max, gc target *)
swap_linger : float ;
binary_chunks_size : int option ;
}
type 'meta meta_config = {
@ -835,6 +838,7 @@ and authenticate pool ?point_info canceler fd point =
P2p_connection.accept
?incoming_message_queue_size:pool.config.incoming_message_queue_size
?outgoing_message_queue_size:pool.config.outgoing_message_queue_size
?binary_chunks_size:pool.config.binary_chunks_size
auth_fd pool.encoding >>= fun conn ->
lwt_debug "authenticate: %a -> Connected %a"
Point.pp point

View File

@ -113,6 +113,9 @@ type config = {
(** Peer swapping does not occur more than once during a timespan of
[spap_linger] seconds. *)
binary_chunks_size : int option ;
(** Size (in bytes) of binary blocks that are sent to other
peers. Default value is 64 kB. *)
}
type 'meta meta_config = {
@ -254,7 +257,8 @@ val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
where [conn'] is the internal [P2p_connection.t] inside [conn]. *)
(**/**)
val raw_write_sync: ('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t
val raw_write_sync:
('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t
(**/**)
val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult

View File

@ -235,6 +235,39 @@ module Simple_message = struct
end
module Chunked_message = struct
let encoding = Data_encoding.bytes
let simple_msg = MBytes.create (1 lsl 8)
let simple_msg2 = MBytes.create (1 lsl 8)
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_connection.accept
~binary_chunks_size:21 auth_fd encoding >>=? fun conn ->
P2p_connection.write_sync conn simple_msg >>=? fun () ->
P2p_connection.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
sync ch >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept
~binary_chunks_size:21 auth_fd encoding >>=? fun conn ->
P2p_connection.write_sync conn simple_msg2 >>=? fun () ->
P2p_connection.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
sync ch >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
return ()
let run _dir = run_nodes client server
end
module Close_on_read = struct
let encoding = Data_encoding.bytes
@ -346,6 +379,7 @@ let main () =
"kick", Kick.run ;
"kicked", Kicked.run ;
"simple-message", Simple_message.run ;
"chunked-message", Chunked_message.run ;
"close-on-read", Close_on_read.run ;
"close-on-write", Close_on_write.run ;
"garbled-data", Garbled_data.run ;

View File

@ -81,6 +81,7 @@ let detach_node f points n =
max_known_points = None ;
max_known_peer_ids = None ;
swap_linger = 0. ;
binary_chunks_size = None
} in
Process.detach
~prefix:(Format.asprintf "%a: " Peer_id.pp_short identity.peer_id)