@ -10,8 +10,6 @@
(* TODO encode/encrypt before to push into the writer pipe. *)
(* TODO patch Sodium.Box to avoid allocation of the encrypted buffer.*)
(* TODO patch Data_encoding for continuation-based binary writer/reader. *)
(* TODO use queue bound by memory size of its elements, not by the
number of elements. *)
(* TODO test `close ~wait:true`. *)
(* TODO nothing in welcoming message proves that the incoming peer is
the owner of the public key... only the first message will
@ -218,7 +216,7 @@ module Reader = struct
canceler: Canceler.t ;
conn: connection ;
encoding: 'msg Data_encoding.t ;
messages: 'msg tzresult Lwt_pipe.t ;
messages: (int * 'msg) tzresult Lwt_pipe.t ;
mutable worker: unit Lwt.t ;
@ -229,13 +227,15 @@ module Reader = struct
Lwt_unix.yield () >>= fun () ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf ->
read_message st buf
let size = 6 * (Sys.word_size / 8) + MBytes.length buf in
read_message st buf >>|? fun msg ->
size, msg
end >>= function
| Ok None ->
| Ok (_, None) ->
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
worker_loop st
| Ok (Some msg) ->
Lwt_pipe.push st.messages (Ok msg) >>= fun () ->
| Ok (size, Some msg) ->
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
worker_loop st
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
@ -245,6 +245,11 @@ module Reader = struct
let run ?size conn encoding canceler =
let compute_size = function
| Ok (size, _) -> (Sys.word_size / 8) * 11 + size
| Error _ -> 0 (* we push Error only when we close the socket,
we don't fear memory leaks in that case... *) in
let size = map_option size ~f:(fun max -> (max, compute_size)) in
let st =
{ canceler ; conn ; encoding ;
messages = Lwt_pipe.create ?size () ;
@ -301,6 +306,13 @@ module Writer = struct
let run ?size conn encoding canceler =
let compute_size = function
| msg, None ->
10 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg
| msg, Some _ ->
18 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg
let size = map_option size ~f:(fun max -> max, compute_size) in
let st =
{ canceler ; conn ; encoding ;
messages = Lwt_pipe.create ?size () ;
@ -367,10 +379,6 @@ let catch_closed_pipe f =
| exn -> fail (Exn exn)
let is_writable { writer } =
not (Lwt_pipe.is_full writer.messages)
let wait_writable { writer } =
Lwt_pipe.not_full writer.messages
let write { writer } msg =
catch_closed_pipe begin fun () ->
Lwt_pipe.push writer.messages (msg, None) >>= return
@ -71,14 +71,6 @@ val accept:
(** {2 Output functions} *)
val is_writable: 'msg t -> bool
(** [is_writable conn] is [true] iff [conn] internal write queue is
not full. *)
val wait_writable: 'msg t -> unit Lwt.t
(** (Cancelable) [wait_writable conn] returns when [conn]'s internal
write queue becomes writable (i.e. not full). *)
val write: 'msg t -> 'msg -> unit tzresult Lwt.t
(** [write conn msg] returns when [msg] has successfully been added to
[conn]'s internal write queue or fails with a corresponding
@ -103,12 +95,12 @@ val wait_readable: 'msg t -> unit tzresult Lwt.t
(** (Cancelable) [wait_readable conn] returns when [conn]'s internal
read queue becomes readable (i.e. not empty). *)
val read: 'msg t -> 'msg tzresult Lwt.t
val read: 'msg t -> (int * 'msg) tzresult Lwt.t
(** [read conn msg] returns when [msg] has successfully been popped
from [conn]'s internal read queue or fails with a corresponding
error. *)
val read_now: 'msg t -> 'msg tzresult option
val read_now: 'msg t -> (int * 'msg) tzresult option
(** [read_now conn msg] is [Some msg] if [conn]'s internal read queue
is not empty, [None] if it is empty, or fails with a correponding
error otherwise. *)
@ -65,7 +65,7 @@ module Answerer = struct
type 'msg callback = {
bootstrap: unit -> Point.t list Lwt.t ;
advertise: Point.t list -> unit Lwt.t ;
message: 'msg -> unit Lwt.t ;
message: int -> 'msg -> unit Lwt.t ;
type 'msg t = {
@ -80,7 +80,7 @@ module Answerer = struct
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
|||| st.conn
end >>= function
| Ok Bootstrap -> begin
| Ok (_, Bootstrap) -> begin
st.callback.bootstrap () >>= function
| [] ->
worker_loop st
@ -93,13 +93,13 @@ module Answerer = struct
Canceler.cancel st.canceler >>= fun () ->
| Ok (Advertise points) ->
| Ok (_, Advertise points) ->
st.callback.advertise points >>= fun () ->
worker_loop st
| Ok (Message msg) ->
st.callback.message msg >>= fun () ->
| Ok (size, Message msg) ->
st.callback.message size msg >>= fun () ->
worker_loop st
| Ok Disconnect | Error [P2p_io_scheduler.Connection_closed] ->
| Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] ->
Canceler.cancel st.canceler >>= fun () ->
| Error [Lwt_utils.Canceled] ->
@ -181,7 +181,7 @@ and events = {
and ('msg, 'meta) connection = {
canceler : Canceler.t ;
messages : 'msg Lwt_pipe.t ;
messages : (int * 'msg) Lwt_pipe.t ;
conn : 'msg Message.t P2p_connection.t ;
gid_info : (('msg, 'meta) connection, 'meta) Gid_info.t ;
point_info : ('msg, 'meta) connection Point_info.t option ;
@ -248,10 +248,13 @@ let active_connections pool = Gid.Table.length pool.connected_gids
let create_connection pool conn id_point pi gi =
let gid = Gid_info.gid gi in
let canceler = Canceler.create () in
let messages =
Lwt_pipe.create ?size:pool.config.incoming_app_message_queue_size () in
let size =
map_option pool.config.incoming_app_message_queue_size
~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in
let messages = Lwt_pipe.create ?size () in
let callback =
{ Answerer.message = Lwt_pipe.push messages ;
{ Answerer.message =
(fun size msg -> Lwt_pipe.push messages (size, msg)) ;
advertise = register_new_points pool gid ;
bootstrap = list_known_points pool gid ;
} in
@ -471,7 +474,7 @@ let accept pool fd point =
let read { messages } =
(fun () -> Lwt_pipe.pop messages >>= return)
(fun () -> Lwt_pipe.pop messages >>= fun ( _, msg) -> return msg)
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
let is_readable { messages } =
@ -315,6 +315,13 @@ let create
exception Closed
let read_size = function
| Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf
| Error _ -> 0 (* we push Error only when we close the socket,
we don't fear memory leaks in that case... *)
let write_size mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes
let register =
let cpt = ref 0 in
fun st conn ->
@ -324,8 +331,12 @@ let register =
end else begin
let id = incr cpt; !cpt in
let canceler = Canceler.create () in
let read_queue = Lwt_pipe.create ?size:st.read_queue_size ()
and write_queue = Lwt_pipe.create ?size:st.write_queue_size () in
let read_size =
map_option st.read_queue_size ~f:(fun v -> v, read_size) in
let write_size =
map_option st.write_queue_size ~f:(fun v -> v, write_size) in
let read_queue = Lwt_pipe.create ?size:read_size () in
let write_queue = Lwt_pipe.create ?size:write_size () in
let read_conn =
st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id
@ -41,7 +41,7 @@ val create:
(** [create ~max_upload_speed ~max_download_speed ~read_queue_size
~write_queue_size ()] is an IO scheduler with specified (global)
max upload (resp. download) speed, and specified read
(resp. write) queue sizes for connections. *)
(resp. write) queue sizes (in bytes) for connections. *)
val register: t -> Lwt_unix.file_descr -> connection
(** [register sched fd] is a [connection] managed by [sched]. *)
@ -10,25 +10,30 @@
open Lwt.Infix
type 'a t =
{ queue : 'a Queue.t ;
size : int option ;
{ queue : (int * 'a) Queue.t ;
mutable current_size : int ;
max_size : int ;
compute_size : ('a -> int) ;
mutable closed : bool ;
mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ;
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ;
empty: unit Lwt_condition.t ;
full: unit Lwt_condition.t ;
not_full : unit Lwt_condition.t ;
let create ?size () =
let max_size, compute_size =
match size with
| None -> max_int, (fun _ -> 0)
| Some (max_size, compute_size) ->
max_size, (fun e -> 4 * (Sys.word_size / 8) + compute_size e) in
{ queue = Queue.create () ;
size ;
current_size = 0 ;
max_size ;
compute_size ;
closed = false ;
push_waiter = None ;
pop_waiter = None ;
empty = Lwt_condition.create () ;
full = Lwt_condition.create () ;
not_full = Lwt_condition.create () ;
let notify_push q =
@ -61,49 +66,40 @@ let wait_pop q =
q.pop_waiter <- Some (waiter, wakener) ;
Lwt.protected waiter
let available_space { size } len =
match size with
| None -> true
| Some size -> len < size
let length { queue } = Queue.length queue
let is_empty { queue } = Queue.is_empty queue
let is_full ({ queue } as q) = not (available_space q (Queue.length queue))
let rec empty q =
if is_empty q
then Lwt.return_unit
else (Lwt_condition.wait q.empty >>= fun () -> empty q)
let rec full q =
if is_full q
then Lwt.return_unit
else (Lwt_condition.wait q.full >>= fun () -> full q)
let rec not_full q =
if not (is_empty q)
then Lwt.return_unit
else (Lwt_condition.wait q.not_full >>= fun () -> not_full q)
exception Closed
let rec push ({ closed ; queue ; full } as q) elt =
let len = Queue.length queue in
if closed then Closed
else if available_space q len then begin
Queue.push elt queue ;
let rec push ({ closed ; queue ; current_size ;
max_size ; compute_size} as q) elt =
let elt_size = compute_size elt in
if closed then
|||| Closed
else if current_size + elt_size < max_size || Queue.is_empty queue then begin
Queue.push (elt_size, elt) queue ;
q.current_size <- current_size + elt_size ;
notify_push q ;
(if not (available_space q (len + 1)) then Lwt_condition.signal full ());
end else
wait_pop q >>= fun () ->
push q elt
let rec push_now ({ closed ; queue ; full } as q) elt =
let rec push_now ({ closed ; queue ; compute_size ;
current_size ; max_size
} as q) elt =
if closed then raise Closed ;
let len = Queue.length queue in
available_space q len && begin
Queue.push elt queue ;
let elt_size = compute_size elt in
(current_size + elt_size < max_size || Queue.is_empty queue)
&& begin
Queue.push (elt_size, elt) queue ;
q.current_size <- current_size + elt_size ;
notify_push q ;
(if not (available_space q (len + 1)) then Lwt_condition.signal full ()) ;
@ -112,27 +108,11 @@ exception Full
let push_now_exn q elt =
if not (push_now q elt) then raise Full
let rec pop_all ({ closed ; queue ; empty ; not_full } as q) =
let was_full = is_full q in
let rec pop ({ closed ; queue ; empty ; current_size } as q) =
if not (Queue.is_empty queue) then
let queue_copy = Queue.copy queue in
Queue.clear queue;
let (elt_size, elt) = Queue.pop queue in
notify_pop q ;
(if was_full then Lwt_condition.signal not_full ());
Lwt_condition.signal empty ();
Lwt.return queue_copy
else if closed then
|||| Closed
wait_push q >>= fun () ->
pop_all q
let rec pop ({ closed ; queue ; empty ; not_full } as q) =
let was_full = is_full q in
if not (Queue.is_empty queue) then
let elt = Queue.pop queue in
notify_pop q ;
(if was_full then Lwt_condition.signal not_full ());
q.current_size <- current_size - elt_size ;
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
Lwt.return elt
else if closed then
@ -143,7 +123,7 @@ let rec pop ({ closed ; queue ; empty ; not_full } as q) =
let rec peek ({ closed ; queue } as q) =
if not (Queue.is_empty queue) then
let elt = Queue.peek queue in
let (_elt_size, elt) = Queue.peek queue in
Lwt.return elt
else if closed then
|||| Closed
@ -153,28 +133,15 @@ let rec peek ({ closed ; queue } as q) =
exception Empty
let pop_now_exn ({ closed ; queue ; empty ; not_full } as q) =
let was_full = is_full q in
let pop_now_exn ({ closed ; queue ; empty ; current_size } as q) =
if Queue.is_empty queue then
(if closed then raise Closed else raise Empty) ;
let elt = Queue.pop queue in
(if was_full then Lwt_condition.signal not_full ());
let (elt_size, elt) = Queue.pop queue in
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
q.current_size <- current_size - elt_size ;
notify_pop q ;
let pop_all_now ({ closed ; queue ; empty ; not_full } as q) =
let was_empty = is_empty q in
let was_full = is_full q in
if Queue.is_empty queue then
(if closed then raise Closed else raise Empty) ;
let queue_copy = Queue.copy queue in
Queue.clear queue ;
(if was_full then Lwt_condition.signal not_full ());
(if not was_empty then Lwt_condition.signal empty ());
notify_pop q ;
let pop_now q =
match pop_now_exn q with
| exception Empty -> None
@ -195,7 +162,6 @@ let close q =
q.closed <- true ;
notify_push q ;
notify_pop q ;
Lwt_condition.broadcast_exn q.full Closed ;
let rec iter q ~f =
@ -14,18 +14,15 @@
type 'a t
(** Type of queues holding values of type ['a]. *)
val create : ?size:int -> unit -> 'a t
(** [create ~size] is an empty queue that can hold max [size]
elements. *)
val create : ?size:(int * ('a -> int)) -> unit -> 'a t
(** [create ~size:(max_size, compute_size)] is an empty queue that can
hold max [size] bytes of data, using [compute_size] to compute the
size of a datum. *)
val push : 'a t -> 'a -> unit Lwt.t
(** [push q v] is a thread that blocks while [q] contains more
than [size] elements, then adds [v] at the end of [q]. *)
val pop_all : 'a t -> 'a Queue.t Lwt.t
(** [pop' q] is a thread that returns all elements in [q] or waits
till there is at least one element in [q]. *)
val pop : 'a t -> 'a Lwt.t
(** [pop q] is a thread that blocks while [q] is empty, then
removes and returns the first element in [q]. *)
@ -48,10 +45,6 @@ val push_now_exn : 'a t -> 'a -> unit
(** [push_now q v] adds [v] at the ends of [q] immediately or
raise [Full] if [q] is currently full. *)
val pop_all_now : 'a t -> 'a Queue.t
(** [pop_all_now q] is a copy of [q]'s internal queue, that may be
empty. *)
val pop_now : 'a t -> 'a option
(** [pop_now q] maybe removes and returns the first element in [q] if
[q] contains at least one element. *)
@ -68,18 +61,9 @@ val length : 'a t -> int
val is_empty : 'a t -> bool
(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
val is_full : 'a t -> bool
(** [is_full q] is [true] if [q] is full, [false] otherwise. *)
val empty : 'a t -> unit Lwt.t
(** [empty q] returns when [q] becomes empty. *)
val full : 'a t -> unit Lwt.t
(** [full q] returns when [q] becomes full. *)
val not_full : 'a t -> unit Lwt.t
(** [not_full q] returns when [q] stop being full. *)
val iter : 'a t -> f:('a -> unit Lwt.t) -> unit Lwt.t
(** [iter q ~f] pops all elements of [q] and applies [f] on them. *)
@ -161,14 +161,14 @@ let client addr port =
(* let's exchange a simple message. *)
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|||| conn >>=? fun msg ->
|||| conn >>=? fun (msg_size, msg) ->
assert ( simple_msg msg = 0) ;
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Simple OK" >>= fun () ->
(* let's detect a closed connection on `read`. *)
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
|||| conn >>=? fun msg ->
|||| conn >>=? fun (msg_size, msg) ->
assert ( simple_msg msg = 0) ;
|||| conn >>= fun msg ->
assert (is_connection_closed msg) ;
@ -170,8 +170,8 @@ let max_download_speed = ref None
let max_upload_speed = ref None
let read_buffer_size = ref (1 lsl 14)
let read_queue_size = ref (Some 1)
let write_queue_size = ref (Some 1)
let read_queue_size = ref (Some (1 lsl 14))
let write_queue_size = ref (Some (1 lsl 14))
let delay = ref 60.
let clients = ref 8
