Lwt_pipe: limit by content size in bytes
This commit is contained in:
parent
6efa84fa37
commit
1e4d090e2c
@ -218,7 +218,7 @@ module Reader = struct
|
||||
canceler: Canceler.t ;
|
||||
conn: connection ;
|
||||
encoding: 'msg Data_encoding.t ;
|
||||
messages: 'msg tzresult Lwt_pipe.t ;
|
||||
messages: (int * 'msg) tzresult Lwt_pipe.t ;
|
||||
mutable worker: unit Lwt.t ;
|
||||
}
|
||||
|
||||
@ -229,13 +229,15 @@ module Reader = struct
|
||||
Lwt_unix.yield () >>= fun () ->
|
||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
||||
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf ->
|
||||
read_message st buf
|
||||
let size = 6 * (Sys.word_size / 8) + MBytes.length buf in
|
||||
read_message st buf >>|? fun msg ->
|
||||
size, msg
|
||||
end >>= function
|
||||
| Ok None ->
|
||||
| Ok (_, None) ->
|
||||
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
|
||||
worker_loop st
|
||||
| Ok (Some msg) ->
|
||||
Lwt_pipe.push st.messages (Ok msg) >>= fun () ->
|
||||
| Ok (size, Some msg) ->
|
||||
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
||||
worker_loop st
|
||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
||||
Lwt.return_unit
|
||||
@ -348,9 +350,24 @@ let accept
|
||||
let canceler = Canceler.create () in
|
||||
let conn = { fd ; info ; cryptobox_data } in
|
||||
let reader =
|
||||
Reader.run ?size:incoming_message_queue_size conn encoding canceler
|
||||
let compute_size = function
|
||||
| Ok (size, _) -> (Sys.word_size / 8) * 11 + size
|
||||
| Error err -> (Sys.word_size / 8) * (3 + 3 * (List.length err))
|
||||
in
|
||||
let size = map_option incoming_message_queue_size
|
||||
~f:(fun qs -> (qs, compute_size)) in
|
||||
Reader.run ?size conn encoding canceler
|
||||
and writer =
|
||||
Writer.run ?size:outgoing_message_queue_size conn encoding canceler in
|
||||
let compute_size = function
|
||||
| msg, None ->
|
||||
10 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg
|
||||
| msg, Some _ ->
|
||||
18 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg
|
||||
in
|
||||
let size = map_option outgoing_message_queue_size
|
||||
~f:(fun qs -> qs, compute_size)
|
||||
in
|
||||
Writer.run ?size conn encoding canceler in
|
||||
let conn = { conn ; reader ; writer } in
|
||||
Canceler.on_cancel canceler begin fun () ->
|
||||
P2p_io_scheduler.close fd >>= fun _ ->
|
||||
@ -367,10 +384,6 @@ let catch_closed_pipe f =
|
||||
| exn -> fail (Exn exn)
|
||||
end
|
||||
|
||||
let is_writable { writer } =
|
||||
not (Lwt_pipe.is_full writer.messages)
|
||||
let wait_writable { writer } =
|
||||
Lwt_pipe.not_full writer.messages
|
||||
let write { writer } msg =
|
||||
catch_closed_pipe begin fun () ->
|
||||
Lwt_pipe.push writer.messages (msg, None) >>= return
|
||||
|
@ -71,14 +71,6 @@ val accept:
|
||||
|
||||
(** {2 Output functions} *)
|
||||
|
||||
val is_writable: 'msg t -> bool
|
||||
(** [is_writable conn] is [true] iff [conn] internal write queue is
|
||||
not full. *)
|
||||
|
||||
val wait_writable: 'msg t -> unit Lwt.t
|
||||
(** (Cancelable) [wait_writable conn] returns when [conn]'s internal
|
||||
write queue becomes writable (i.e. not full). *)
|
||||
|
||||
val write: 'msg t -> 'msg -> unit tzresult Lwt.t
|
||||
(** [write conn msg] returns when [msg] has successfully been added to
|
||||
[conn]'s internal write queue or fails with a corresponding
|
||||
@ -103,12 +95,12 @@ val wait_readable: 'msg t -> unit tzresult Lwt.t
|
||||
(** (Cancelable) [wait_readable conn] returns when [conn]'s internal
|
||||
read queue becomes readable (i.e. not empty). *)
|
||||
|
||||
val read: 'msg t -> 'msg tzresult Lwt.t
|
||||
val read: 'msg t -> (int * 'msg) tzresult Lwt.t
|
||||
(** [read conn msg] returns when [msg] has successfully been popped
|
||||
from [conn]'s internal read queue or fails with a corresponding
|
||||
error. *)
|
||||
|
||||
val read_now: 'msg t -> 'msg tzresult option
|
||||
val read_now: 'msg t -> (int * 'msg) tzresult option
|
||||
(** [read_now conn msg] is [Some msg] if [conn]'s internal read queue
|
||||
is not empty, [None] if it is empty, or fails with a correponding
|
||||
error otherwise. *)
|
||||
|
@ -65,7 +65,7 @@ module Answerer = struct
|
||||
type 'msg callback = {
|
||||
bootstrap: unit -> Point.t list Lwt.t ;
|
||||
advertise: Point.t list -> unit Lwt.t ;
|
||||
message: 'msg -> unit Lwt.t ;
|
||||
message: int -> 'msg -> unit Lwt.t ;
|
||||
}
|
||||
|
||||
type 'msg t = {
|
||||
@ -80,7 +80,7 @@ module Answerer = struct
|
||||
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|
||||
P2p_connection.read st.conn
|
||||
end >>= function
|
||||
| Ok Bootstrap -> begin
|
||||
| Ok (_, Bootstrap) -> begin
|
||||
st.callback.bootstrap () >>= function
|
||||
| [] ->
|
||||
worker_loop st
|
||||
@ -93,13 +93,13 @@ module Answerer = struct
|
||||
Canceler.cancel st.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
end
|
||||
| Ok (Advertise points) ->
|
||||
| Ok (_, Advertise points) ->
|
||||
st.callback.advertise points >>= fun () ->
|
||||
worker_loop st
|
||||
| Ok (Message msg) ->
|
||||
st.callback.message msg >>= fun () ->
|
||||
| Ok (size, Message msg) ->
|
||||
st.callback.message size msg >>= fun () ->
|
||||
worker_loop st
|
||||
| Ok Disconnect | Error [P2p_io_scheduler.Connection_closed] ->
|
||||
| Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] ->
|
||||
Canceler.cancel st.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error [Lwt_utils.Canceled] ->
|
||||
@ -181,7 +181,7 @@ and events = {
|
||||
|
||||
and ('msg, 'meta) connection = {
|
||||
canceler : Canceler.t ;
|
||||
messages : 'msg Lwt_pipe.t ;
|
||||
messages : (int * 'msg) Lwt_pipe.t ;
|
||||
conn : 'msg Message.t P2p_connection.t ;
|
||||
gid_info : (('msg, 'meta) connection, 'meta) Gid_info.t ;
|
||||
point_info : ('msg, 'meta) connection Point_info.t option ;
|
||||
@ -248,10 +248,12 @@ let active_connections pool = Gid.Table.length pool.connected_gids
|
||||
let create_connection pool conn id_point pi gi =
|
||||
let gid = Gid_info.gid gi in
|
||||
let canceler = Canceler.create () in
|
||||
let messages =
|
||||
Lwt_pipe.create ?size:pool.config.incoming_app_message_queue_size () in
|
||||
let size = map_option pool.config.incoming_app_message_queue_size
|
||||
~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * (11 + size))
|
||||
in
|
||||
let messages = Lwt_pipe.create ?size () in
|
||||
let callback =
|
||||
{ Answerer.message = Lwt_pipe.push messages ;
|
||||
{ Answerer.message = (fun size msg -> Lwt_pipe.push messages (size, msg)) ;
|
||||
advertise = register_new_points pool gid ;
|
||||
bootstrap = list_known_points pool gid ;
|
||||
} in
|
||||
@ -471,7 +473,7 @@ let accept pool fd point =
|
||||
|
||||
let read { messages } =
|
||||
Lwt.catch
|
||||
(fun () -> Lwt_pipe.pop messages >>= return)
|
||||
(fun () -> Lwt_pipe.pop messages >>= fun ( _, msg) -> return msg)
|
||||
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
|
||||
|
||||
let is_readable { messages } =
|
||||
|
@ -315,6 +315,12 @@ let create
|
||||
|
||||
exception Closed
|
||||
|
||||
let read_size_fun = function
|
||||
| Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf
|
||||
| Error exns -> (Sys.word_size / 8) * (1 + 3 * (List.length exns))
|
||||
|
||||
let write_size_fun mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes
|
||||
|
||||
let register =
|
||||
let cpt = ref 0 in
|
||||
fun st conn ->
|
||||
@ -324,8 +330,10 @@ let register =
|
||||
end else begin
|
||||
let id = incr cpt; !cpt in
|
||||
let canceler = Canceler.create () in
|
||||
let read_queue = Lwt_pipe.create ?size:st.read_queue_size ()
|
||||
and write_queue = Lwt_pipe.create ?size:st.write_queue_size () in
|
||||
let read_size_arg = map_option st.read_queue_size ~f:(fun v -> v, read_size_fun) in
|
||||
let write_size_arg = map_option st.write_queue_size ~f:(fun v -> v, write_size_fun) in
|
||||
let read_queue = Lwt_pipe.create ?size:read_size_arg () in
|
||||
let write_queue = Lwt_pipe.create ?size:write_size_arg () in
|
||||
let read_conn =
|
||||
ReadScheduler.create_connection
|
||||
st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id
|
||||
|
@ -41,7 +41,7 @@ val create:
|
||||
(** [create ~max_upload_speed ~max_download_speed ~read_queue_size
|
||||
~write_queue_size ()] is an IO scheduler with specified (global)
|
||||
max upload (resp. download) speed, and specified read
|
||||
(resp. write) queue sizes for connections. *)
|
||||
(resp. write) queue sizes (in bytes) for connections. *)
|
||||
|
||||
val register: t -> Lwt_unix.file_descr -> connection
|
||||
(** [register sched fd] is a [connection] managed by [sched]. *)
|
||||
|
@ -10,25 +10,29 @@
|
||||
open Lwt.Infix
|
||||
|
||||
type 'a t =
|
||||
{ queue : 'a Queue.t ;
|
||||
size : int option ;
|
||||
{ queue : (int * 'a) Queue.t ;
|
||||
mutable current_size : int ;
|
||||
max_size : int ;
|
||||
compute_size : ('a -> int) ;
|
||||
mutable closed : bool ;
|
||||
mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ;
|
||||
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ;
|
||||
empty: unit Lwt_condition.t ;
|
||||
full: unit Lwt_condition.t ;
|
||||
not_full : unit Lwt_condition.t ;
|
||||
}
|
||||
|
||||
let create ?size () =
|
||||
let max_size, compute_size = match size with
|
||||
| None -> max_int, (fun _ -> 0)
|
||||
| Some (max_size, compute_size) ->
|
||||
max_size, (fun e -> 4 * (Sys.word_size / 8) + compute_size e) in
|
||||
{ queue = Queue.create () ;
|
||||
size ;
|
||||
current_size = 0 ;
|
||||
max_size ;
|
||||
compute_size ;
|
||||
closed = false ;
|
||||
push_waiter = None ;
|
||||
pop_waiter = None ;
|
||||
empty = Lwt_condition.create () ;
|
||||
full = Lwt_condition.create () ;
|
||||
not_full = Lwt_condition.create () ;
|
||||
}
|
||||
|
||||
let notify_push q =
|
||||
@ -61,49 +65,41 @@ let wait_pop q =
|
||||
q.pop_waiter <- Some (waiter, wakener) ;
|
||||
Lwt.protected waiter
|
||||
|
||||
let available_space { size } len =
|
||||
match size with
|
||||
| None -> true
|
||||
| Some size -> len < size
|
||||
|
||||
let length { queue } = Queue.length queue
|
||||
let is_empty { queue } = Queue.is_empty queue
|
||||
let is_full ({ queue } as q) = not (available_space q (Queue.length queue))
|
||||
|
||||
let rec empty q =
|
||||
if is_empty q
|
||||
then Lwt.return_unit
|
||||
else (Lwt_condition.wait q.empty >>= fun () -> empty q)
|
||||
let rec full q =
|
||||
if is_full q
|
||||
then Lwt.return_unit
|
||||
else (Lwt_condition.wait q.full >>= fun () -> full q)
|
||||
let rec not_full q =
|
||||
if not (is_empty q)
|
||||
then Lwt.return_unit
|
||||
else (Lwt_condition.wait q.not_full >>= fun () -> not_full q)
|
||||
|
||||
exception Closed
|
||||
|
||||
let rec push ({ closed ; queue ; full } as q) elt =
|
||||
let len = Queue.length queue in
|
||||
let rec push ({ closed ; queue ; current_size ;
|
||||
max_size ; compute_size} as q) elt =
|
||||
if closed then Lwt.fail Closed
|
||||
else if available_space q len then begin
|
||||
Queue.push elt queue ;
|
||||
else
|
||||
let elt_size = compute_size elt in
|
||||
if current_size + elt_size < max_size then begin
|
||||
Queue.push (elt_size, elt) queue ;
|
||||
q.current_size <- current_size + elt_size ;
|
||||
notify_push q ;
|
||||
(if not (available_space q (len + 1)) then Lwt_condition.signal full ());
|
||||
Lwt.return_unit
|
||||
end else
|
||||
end
|
||||
else
|
||||
wait_pop q >>= fun () ->
|
||||
push q elt
|
||||
|
||||
let rec push_now ({ closed ; queue ; full } as q) elt =
|
||||
let rec push_now ({ closed ; queue ; compute_size ;
|
||||
current_size ; max_size
|
||||
} as q) elt =
|
||||
if closed then raise Closed ;
|
||||
let len = Queue.length queue in
|
||||
available_space q len && begin
|
||||
Queue.push elt queue ;
|
||||
let elt_size = compute_size elt in
|
||||
(current_size + elt_size < max_size)
|
||||
&& begin
|
||||
Queue.push (elt_size, elt) queue ;
|
||||
q.current_size <- current_size + elt_size ;
|
||||
notify_push q ;
|
||||
(if not (available_space q (len + 1)) then Lwt_condition.signal full ()) ;
|
||||
true
|
||||
end
|
||||
|
||||
@ -112,27 +108,11 @@ exception Full
|
||||
let push_now_exn q elt =
|
||||
if not (push_now q elt) then raise Full
|
||||
|
||||
let rec pop_all ({ closed ; queue ; empty ; not_full } as q) =
|
||||
let was_full = is_full q in
|
||||
let rec pop ({ closed ; queue ; empty ; current_size } as q) =
|
||||
if not (Queue.is_empty queue) then
|
||||
let queue_copy = Queue.copy queue in
|
||||
Queue.clear queue;
|
||||
let (elt_size, elt) = Queue.pop queue in
|
||||
notify_pop q ;
|
||||
(if was_full then Lwt_condition.signal not_full ());
|
||||
Lwt_condition.signal empty ();
|
||||
Lwt.return queue_copy
|
||||
else if closed then
|
||||
Lwt.fail Closed
|
||||
else
|
||||
wait_push q >>= fun () ->
|
||||
pop_all q
|
||||
|
||||
let rec pop ({ closed ; queue ; empty ; not_full } as q) =
|
||||
let was_full = is_full q in
|
||||
if not (Queue.is_empty queue) then
|
||||
let elt = Queue.pop queue in
|
||||
notify_pop q ;
|
||||
(if was_full then Lwt_condition.signal not_full ());
|
||||
q.current_size <- current_size - elt_size ;
|
||||
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
|
||||
Lwt.return elt
|
||||
else if closed then
|
||||
@ -143,7 +123,7 @@ let rec pop ({ closed ; queue ; empty ; not_full } as q) =
|
||||
|
||||
let rec peek ({ closed ; queue } as q) =
|
||||
if not (Queue.is_empty queue) then
|
||||
let elt = Queue.peek queue in
|
||||
let (_elt_size, elt) = Queue.peek queue in
|
||||
Lwt.return elt
|
||||
else if closed then
|
||||
Lwt.fail Closed
|
||||
@ -153,28 +133,15 @@ let rec peek ({ closed ; queue } as q) =
|
||||
|
||||
exception Empty
|
||||
|
||||
let pop_now_exn ({ closed ; queue ; empty ; not_full } as q) =
|
||||
let was_full = is_full q in
|
||||
let pop_now_exn ({ closed ; queue ; empty ; current_size } as q) =
|
||||
if Queue.is_empty queue then
|
||||
(if closed then raise Closed else raise Empty) ;
|
||||
let elt = Queue.pop queue in
|
||||
(if was_full then Lwt_condition.signal not_full ());
|
||||
let (elt_size, elt) = Queue.pop queue in
|
||||
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
|
||||
q.current_size <- current_size - elt_size ;
|
||||
notify_pop q ;
|
||||
elt
|
||||
|
||||
let pop_all_now ({ closed ; queue ; empty ; not_full } as q) =
|
||||
let was_empty = is_empty q in
|
||||
let was_full = is_full q in
|
||||
if Queue.is_empty queue then
|
||||
(if closed then raise Closed else raise Empty) ;
|
||||
let queue_copy = Queue.copy queue in
|
||||
Queue.clear queue ;
|
||||
(if was_full then Lwt_condition.signal not_full ());
|
||||
(if not was_empty then Lwt_condition.signal empty ());
|
||||
notify_pop q ;
|
||||
queue_copy
|
||||
|
||||
let pop_now q =
|
||||
match pop_now_exn q with
|
||||
| exception Empty -> None
|
||||
@ -195,7 +162,6 @@ let close q =
|
||||
q.closed <- true ;
|
||||
notify_push q ;
|
||||
notify_pop q ;
|
||||
Lwt_condition.broadcast_exn q.full Closed ;
|
||||
end
|
||||
|
||||
let rec iter q ~f =
|
||||
|
@ -14,18 +14,15 @@
|
||||
type 'a t
|
||||
(** Type of queues holding values of type ['a]. *)
|
||||
|
||||
val create : ?size:int -> unit -> 'a t
|
||||
(** [create ~size] is an empty queue that can hold max [size]
|
||||
elements. *)
|
||||
val create : ?size:(int * ('a -> int)) -> unit -> 'a t
|
||||
(** [create ~size:(max_size, compute_size)] is an empty queue that can
|
||||
hold max [size] bytes of data, using [compute_size] to compute the
|
||||
size of a datum. *)
|
||||
|
||||
val push : 'a t -> 'a -> unit Lwt.t
|
||||
(** [push q v] is a thread that blocks while [q] contains more
|
||||
than [size] elements, then adds [v] at the end of [q]. *)
|
||||
|
||||
val pop_all : 'a t -> 'a Queue.t Lwt.t
|
||||
(** [pop' q] is a thread that returns all elements in [q] or waits
|
||||
till there is at least one element in [q]. *)
|
||||
|
||||
val pop : 'a t -> 'a Lwt.t
|
||||
(** [pop q] is a thread that blocks while [q] is empty, then
|
||||
removes and returns the first element in [q]. *)
|
||||
@ -48,10 +45,6 @@ val push_now_exn : 'a t -> 'a -> unit
|
||||
(** [push_now q v] adds [v] at the ends of [q] immediately or
|
||||
raise [Full] if [q] is currently full. *)
|
||||
|
||||
val pop_all_now : 'a t -> 'a Queue.t
|
||||
(** [pop_all_now q] is a copy of [q]'s internal queue, that may be
|
||||
empty. *)
|
||||
|
||||
val pop_now : 'a t -> 'a option
|
||||
(** [pop_now q] maybe removes and returns the first element in [q] if
|
||||
[q] contains at least one element. *)
|
||||
@ -68,18 +61,9 @@ val length : 'a t -> int
|
||||
val is_empty : 'a t -> bool
|
||||
(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
|
||||
|
||||
val is_full : 'a t -> bool
|
||||
(** [is_full q] is [true] if [q] is full, [false] otherwise. *)
|
||||
|
||||
val empty : 'a t -> unit Lwt.t
|
||||
(** [empty q] returns when [q] becomes empty. *)
|
||||
|
||||
val full : 'a t -> unit Lwt.t
|
||||
(** [full q] returns when [q] becomes full. *)
|
||||
|
||||
val not_full : 'a t -> unit Lwt.t
|
||||
(** [not_full q] returns when [q] stop being full. *)
|
||||
|
||||
val iter : 'a t -> f:('a -> unit Lwt.t) -> unit Lwt.t
|
||||
(** [iter q ~f] pops all elements of [q] and applies [f] on them. *)
|
||||
|
||||
|
@ -162,14 +162,14 @@ let client addr port =
|
||||
(* let's exchange a simple message. *)
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.read conn >>=? fun msg ->
|
||||
P2p_connection.read conn >>=? fun (msg_size, msg) ->
|
||||
assert (MBytes.compare simple_msg msg = 0) ;
|
||||
P2p_connection.close conn >>= fun _stat ->
|
||||
lwt_log_notice "Simple OK" >>= fun () ->
|
||||
(* let's detect a closed connection on `read`. *)
|
||||
connect sched addr port id2 >>=? fun auth_fd ->
|
||||
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|
||||
P2p_connection.read conn >>=? fun msg ->
|
||||
P2p_connection.read conn >>=? fun (msg_size, msg) ->
|
||||
assert (MBytes.compare simple_msg msg = 0) ;
|
||||
P2p_connection.read conn >>= fun msg ->
|
||||
assert (is_connection_closed msg) ;
|
||||
|
@ -170,8 +170,8 @@ let max_download_speed = ref None
|
||||
let max_upload_speed = ref None
|
||||
|
||||
let read_buffer_size = ref (1 lsl 14)
|
||||
let read_queue_size = ref (Some 1)
|
||||
let write_queue_size = ref (Some 1)
|
||||
let read_queue_size = ref (Some (4096 * 16))
|
||||
let write_queue_size = ref (Some (4096 * 16))
|
||||
|
||||
let delay = ref 60.
|
||||
let clients = ref 8
|
||||
|
Loading…
Reference in New Issue
Block a user