ligo/src/lib_p2p/p2p_io_scheduler.ml

507 lines
16 KiB
OCaml
Raw Normal View History

2017-01-14 16:13:59 +04:00
(**************************************************************************)
(* *)
2018-02-06 00:17:03 +04:00
(* Copyright (c) 2014 - 2018. *)
2017-01-14 16:13:59 +04:00
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(* TODO decide whether we need to preallocate buffers or not. *)
2017-12-04 18:46:26 +04:00
let () =
(* Otherwise some writes trigger a SIGPIPE instead of raising an
Lwt_unit exception. In the node, this is already done by
Cohttp, so this is only useful when using the P2P layer as a
stand alone library. *)
if Sys.os_type <> "Win32" then
Sys.(set_signal sigpipe Signal_ignore)
2017-01-14 16:13:59 +04:00
include Logging.Make (struct let name = "p2p.io-scheduler" end)
module Inttbl = Hashtbl.Make(struct
type t = int
let equal (x: int) (y: int) = x = y
let hash = Hashtbl.hash
end)
let alpha = 0.2
module type IO = sig
val name: string
type in_param
val pop: in_param -> MBytes.t tzresult Lwt.t
type out_param
val push: out_param -> MBytes.t -> unit tzresult Lwt.t
val close: out_param -> error list -> unit Lwt.t
end
type error += Connection_closed
2017-01-14 16:13:59 +04:00
module Scheduler(IO : IO) = struct
type t = {
canceler: Lwt_canceler.t ;
2017-01-14 16:13:59 +04:00
mutable worker: unit Lwt.t ;
counter: Moving_average.t ;
max_speed: int option ;
mutable quota: int ;
quota_updated: unit Lwt_condition.t ;
readys: unit Lwt_condition.t ;
readys_high: (connection * MBytes.t tzresult) Queue.t ;
readys_low: (connection * MBytes.t tzresult) Queue.t ;
}
and connection = {
id: int ;
mutable closed: bool ;
canceler: Lwt_canceler.t ;
2017-01-14 16:13:59 +04:00
in_param: IO.in_param ;
out_param: IO.out_param ;
mutable current_pop: MBytes.t tzresult Lwt.t ;
mutable current_push: unit tzresult Lwt.t ;
counter: Moving_average.t ;
mutable quota: int ;
mutable last_quota: int ;
}
let cancel (conn : connection) err =
2017-02-13 16:37:57 +04:00
Lwt_utils.unless conn.closed begin fun () ->
lwt_debug "Connection closed (%d, %s) " conn.id IO.name >>= fun () ->
2017-01-14 16:13:59 +04:00
conn.closed <- true ;
Lwt.catch
(fun () -> IO.close conn.out_param err)
(fun _ -> Lwt.return_unit) >>= fun () ->
Lwt_canceler.cancel conn.canceler
2017-02-13 16:37:57 +04:00
end
2017-01-14 16:13:59 +04:00
let waiter st conn =
assert (Lwt.state conn.current_pop <> Sleep) ;
conn.current_pop <- IO.pop conn.in_param ;
Lwt.async begin fun () ->
conn.current_pop >>= fun res ->
conn.current_push >>= fun _ ->
let was_empty =
Queue.is_empty st.readys_high && Queue.is_empty st.readys_low in
if conn.quota > 0 then
Queue.push (conn, res) st.readys_high
else
Queue.push (conn, res) st.readys_low ;
if was_empty then Lwt_condition.broadcast st.readys () ;
Lwt.return_unit
end
let wait_data st =
let is_empty =
Queue.is_empty st.readys_high && Queue.is_empty st.readys_low in
if is_empty then Lwt_condition.wait st.readys else Lwt.return_unit
let check_quota st =
2017-02-13 16:37:57 +04:00
if st.max_speed <> None && st.quota < 0 then begin
lwt_debug "scheduler.wait_quota(%s)" IO.name >>= fun () ->
2017-01-14 16:13:59 +04:00
Lwt_condition.wait st.quota_updated
2017-02-13 16:37:57 +04:00
end else
2017-01-14 16:13:59 +04:00
Lwt_unix.yield ()
let rec worker_loop st =
check_quota st >>= fun () ->
2017-02-13 16:37:57 +04:00
lwt_debug "scheduler.wait(%s)" IO.name >>= fun () ->
2017-01-14 16:13:59 +04:00
Lwt.pick [
Lwt_canceler.cancelation st.canceler ;
2017-01-14 16:13:59 +04:00
wait_data st
] >>= fun () ->
if Lwt_canceler.canceled st.canceler then
2017-01-14 16:13:59 +04:00
Lwt.return_unit
else
let prio, (conn, msg) =
if not (Queue.is_empty st.readys_high) then
true, (Queue.pop st.readys_high)
else
false, (Queue.pop st.readys_low)
in
match msg with
| Error [ Lwt_utils_unix.Canceled ] ->
2017-01-14 16:13:59 +04:00
worker_loop st
| Error ([Connection_closed |
Exn ( Lwt_pipe.Closed |
2017-11-08 17:51:52 +04:00
Unix.Unix_error ((EBADF | ETIMEDOUT), _, _) )]
as err) ->
2017-02-13 16:37:57 +04:00
lwt_debug "Connection closed (pop: %d, %s)"
conn.id IO.name >>= fun () ->
2017-01-14 16:13:59 +04:00
cancel conn err >>= fun () ->
worker_loop st
| Error err ->
2017-02-13 16:37:57 +04:00
lwt_log_error
"@[Unexpected error in connection (pop: %d, %s):@ %a@]"
conn.id IO.name pp_print_error err >>= fun () ->
2017-01-14 16:13:59 +04:00
cancel conn err >>= fun () ->
worker_loop st
| Ok msg ->
conn.current_push <- begin
IO.push conn.out_param msg >>= function
| Ok ()
| Error [ Lwt_utils_unix.Canceled ] ->
2017-01-14 16:13:59 +04:00
return ()
| Error ([Connection_closed |
Exn (Unix.Unix_error (EBADF, _, _) |
2017-01-14 16:13:59 +04:00
Lwt_pipe.Closed)] as err) ->
2017-02-13 16:37:57 +04:00
lwt_debug "Connection closed (push: %d, %s)"
conn.id IO.name >>= fun () ->
2017-01-14 16:13:59 +04:00
cancel conn err >>= fun () ->
return ()
| Error err ->
2017-02-13 16:37:57 +04:00
lwt_log_error
"@[Unexpected error in connection (push: %d, %s):@ %a@]"
conn.id IO.name pp_print_error err >>= fun () ->
2017-01-14 16:13:59 +04:00
cancel conn err >>= fun () ->
Lwt.return (Error err)
end ;
let len = MBytes.length msg in
2017-02-13 16:37:57 +04:00
lwt_debug "Handle: %d (%d, %s)" len conn.id IO.name >>= fun () ->
2017-01-14 16:13:59 +04:00
Moving_average.add st.counter len ;
st.quota <- st.quota - len ;
Moving_average.add conn.counter len ;
if prio then conn.quota <- conn.quota - len ;
waiter st conn ;
worker_loop st
let create max_speed =
let st = {
canceler = Lwt_canceler.create () ;
2017-01-14 16:13:59 +04:00
worker = Lwt.return_unit ;
counter = Moving_average.create ~init:0 ~alpha ;
max_speed ; quota = Option.unopt ~default:0 max_speed ;
2017-01-14 16:13:59 +04:00
quota_updated = Lwt_condition.create () ;
readys = Lwt_condition.create () ;
readys_high = Queue.create () ;
readys_low = Queue.create () ;
} in
st.worker <-
Lwt_utils.worker IO.name
2017-11-13 17:29:28 +04:00
~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
2017-01-14 16:13:59 +04:00
st
let create_connection st in_param out_param canceler id =
2017-02-13 16:37:57 +04:00
debug "scheduler(%s).create_connection (%d)" IO.name id ;
2017-01-14 16:13:59 +04:00
let conn =
{ id ; closed = false ;
canceler ;
in_param ; out_param ;
current_pop = Lwt.fail Not_found (* dummy *) ;
current_push = return () ;
counter = Moving_average.create ~init:0 ~alpha ;
quota = 0 ; last_quota = 0 ;
} in
waiter st conn ;
conn
let update_quota st =
2017-02-13 16:37:57 +04:00
debug "scheduler(%s).update_quota" IO.name ;
Option.iter st.max_speed ~f:begin fun quota ->
2017-01-14 16:13:59 +04:00
st.quota <- (min st.quota 0) + quota ;
Lwt_condition.broadcast st.quota_updated ()
end ;
if not (Queue.is_empty st.readys_low) then begin
let tmp = Queue.create () in
Queue.iter
(fun ((conn : connection), _ as msg) ->
if conn.quota > 0 then
Queue.push msg st.readys_high
else
Queue.push msg tmp)
st.readys_low ;
Queue.clear st.readys_low ;
Queue.transfer tmp st.readys_low ;
end
2017-01-14 16:13:59 +04:00
let shutdown st =
2017-02-13 16:37:57 +04:00
lwt_debug "--> scheduler(%s).shutdown" IO.name >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
2017-02-13 16:37:57 +04:00
st.worker >>= fun () ->
lwt_debug "<-- scheduler(%s).shutdown" IO.name >>= fun () ->
Lwt.return_unit
2017-01-14 16:13:59 +04:00
end
module ReadScheduler = Scheduler(struct
let name = "io_scheduler(read)"
type in_param = Lwt_unix.file_descr * int
let pop (fd, maxlen) =
Lwt.catch
(fun () ->
let buf = MBytes.create maxlen in
Lwt_bytes.read fd buf 0 maxlen >>= fun len ->
if len = 0 then
fail Connection_closed
else
return (MBytes.sub buf 0 len) )
(function
| Unix.Unix_error(Unix.ECONNRESET, _, _) ->
fail Connection_closed
| exn ->
Lwt.return (error_exn exn))
type out_param = MBytes.t tzresult Lwt_pipe.t
let push p msg =
Lwt.catch
(fun () -> Lwt_pipe.push p (Ok msg) >>= return)
(fun exn -> fail (Exn exn))
let close p err =
Lwt.catch
(fun () -> Lwt_pipe.push p (Error err))
(fun _ -> Lwt.return_unit)
end)
module WriteScheduler = Scheduler(struct
let name = "io_scheduler(write)"
type in_param = MBytes.t Lwt_pipe.t
let pop p =
Lwt.catch
(fun () -> Lwt_pipe.pop p >>= return)
(fun _ -> fail (Exn Lwt_pipe.Closed))
type out_param = Lwt_unix.file_descr
let push fd buf =
Lwt.catch
(fun () ->
Lwt_utils_unix.write_mbytes fd buf >>= return)
2017-01-14 16:13:59 +04:00
(function
| Unix.Unix_error(Unix.ECONNRESET, _, _)
2017-01-14 16:13:59 +04:00
| Unix.Unix_error(Unix.EPIPE, _, _)
| Lwt.Canceled
| End_of_file ->
fail Connection_closed
| exn ->
Lwt.return (error_exn exn))
let close _p _err = Lwt.return_unit
end)
type connection = {
id: int ;
sched: t ;
conn: Lwt_unix.file_descr ;
canceler: Lwt_canceler.t ;
2017-01-14 16:13:59 +04:00
read_conn: ReadScheduler.connection ;
read_queue: MBytes.t tzresult Lwt_pipe.t ;
write_conn: WriteScheduler.connection ;
write_queue: MBytes.t Lwt_pipe.t ;
mutable partial_read: MBytes.t option ;
}
and t = {
mutable closed: bool ;
connected: connection Inttbl.t ;
read_scheduler: ReadScheduler.t ;
write_scheduler: WriteScheduler.t ;
max_upload_speed: int option ; (* bytes per second. *)
max_download_speed: int option ;
read_buffer_size: int ;
read_queue_size: int option ;
write_queue_size: int option ;
}
let reset_quota st =
2017-02-13 16:37:57 +04:00
debug "--> reset quota" ;
2017-01-14 16:13:59 +04:00
let { Moving_average.average = current_inflow } =
Moving_average.stat st.read_scheduler.counter
and { Moving_average.average = current_outflow } =
Moving_average.stat st.write_scheduler.counter in
let nb_conn = Inttbl.length st.connected in
if nb_conn > 0 then begin
let fair_read_quota = current_inflow / nb_conn
and fair_write_quota = current_outflow / nb_conn in
Inttbl.iter
(fun _id conn ->
conn.read_conn.last_quota <- fair_read_quota ;
conn.read_conn.quota <-
(min conn.read_conn.quota 0) + fair_read_quota ;
conn.write_conn.last_quota <- fair_write_quota ;
conn.write_conn.quota <-
(min conn.write_conn.quota 0) + fair_write_quota ; )
st.connected
end ;
ReadScheduler.update_quota st.read_scheduler ;
WriteScheduler.update_quota st.write_scheduler
let create
?max_upload_speed ?max_download_speed
?read_queue_size ?write_queue_size
~read_buffer_size
() =
2017-02-13 16:37:57 +04:00
log_info "--> create" ;
2017-01-14 16:13:59 +04:00
let st = {
closed = false ;
connected = Inttbl.create 53 ;
read_scheduler = ReadScheduler.create max_download_speed ;
write_scheduler = WriteScheduler.create max_upload_speed ;
max_upload_speed ;
max_download_speed ;
read_buffer_size ;
read_queue_size ;
write_queue_size ;
} in
Moving_average.on_update (fun () -> reset_quota st) ;
st
exception Closed
2017-01-24 02:59:16 +04:00
let read_size = function
| Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf + Lwt_pipe.push_overhead
2017-01-24 02:59:16 +04:00
| Error _ -> 0 (* we push Error only when we close the socket,
we don't fear memory leaks in that case... *)
let write_size mbytes =
(Sys.word_size / 8) * 6 + MBytes.length mbytes + Lwt_pipe.push_overhead
2017-01-14 16:13:59 +04:00
let register =
let cpt = ref 0 in
fun st conn ->
if st.closed then begin
Lwt.async (fun () -> Lwt_utils_unix.safe_close conn) ;
raise Closed
end else begin
let id = incr cpt; !cpt in
let canceler = Lwt_canceler.create () in
let read_size =
Option.map st.read_queue_size ~f:(fun v -> v, read_size) in
let write_size =
Option.map st.write_queue_size ~f:(fun v -> v, write_size) in
let read_queue = Lwt_pipe.create ?size:read_size () in
let write_queue = Lwt_pipe.create ?size:write_size () in
let read_conn =
ReadScheduler.create_connection
st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id
and write_conn =
WriteScheduler.create_connection
st.write_scheduler write_queue conn canceler id in
Lwt_canceler.on_cancel canceler begin fun () ->
Inttbl.remove st.connected id ;
Moving_average.destroy read_conn.counter ;
Moving_average.destroy write_conn.counter ;
Lwt_pipe.close write_queue ;
Lwt_pipe.close read_queue ;
Lwt_utils_unix.safe_close conn
end ;
let conn = {
sched = st ; id ; conn ; canceler ;
read_queue ; read_conn ;
write_queue ; write_conn ;
partial_read = None ;
} in
Inttbl.add st.connected id conn ;
log_info "--> register (%d)" conn.id ;
conn
end
2017-01-14 16:13:59 +04:00
let write { write_queue } msg =
Lwt.catch
(fun () -> Lwt_pipe.push write_queue msg >>= return)
(fun _ -> fail Connection_closed)
let write_now { write_queue } msg = Lwt_pipe.push_now write_queue msg
let read_from conn ?pos ?len buf msg =
let maxlen = MBytes.length buf in
let pos = Option.unopt ~default:0 pos in
2017-01-14 16:13:59 +04:00
assert (0 <= pos && pos < maxlen) ;
let len = Option.unopt ~default:(maxlen - pos) len in
2017-01-14 16:13:59 +04:00
assert (len <= maxlen - pos) ;
match msg with
| Ok msg ->
let msg_len = MBytes.length msg in
let read_len = min len msg_len in
MBytes.blit msg 0 buf pos read_len ;
if read_len < msg_len then
conn.partial_read <-
Some (MBytes.sub msg read_len (msg_len - read_len)) ;
Ok read_len
| Error _ ->
Error [Connection_closed]
let read_now conn ?pos ?len buf =
match conn.partial_read with
| Some msg ->
conn.partial_read <- None ;
Some (read_from conn ?pos ?len buf (Ok msg))
| None ->
try
Option.map
2017-11-13 17:29:28 +04:00
~f:(read_from conn ?pos ?len buf)
2017-01-14 16:13:59 +04:00
(Lwt_pipe.pop_now conn.read_queue)
with Lwt_pipe.Closed -> Some (Error [Connection_closed])
let read conn ?pos ?len buf =
match conn.partial_read with
| Some msg ->
conn.partial_read <- None ;
Lwt.return (read_from conn ?pos ?len buf (Ok msg))
| None ->
Lwt.catch
(fun () ->
Lwt_pipe.pop conn.read_queue >|= fun msg ->
read_from conn ?pos ?len buf msg)
(fun _ -> fail Connection_closed)
let read_full conn ?pos ?len buf =
let maxlen = MBytes.length buf in
let pos = Option.unopt ~default:0 pos in
let len = Option.unopt ~default:(maxlen - pos) len in
2017-01-14 16:13:59 +04:00
assert (0 <= pos && pos < maxlen) ;
assert (len <= maxlen - pos) ;
let rec loop pos len =
if len = 0 then
return ()
else
read conn ~pos ~len buf >>=? fun read_len ->
loop (pos + read_len) (len - read_len) in
loop pos len
let convert ~ws ~rs =
{ P2p_stat.total_sent = ws.Moving_average.total ;
2017-01-14 16:13:59 +04:00
total_recv = rs.Moving_average.total ;
current_outflow = ws.average ;
current_inflow = rs.average ;
}
let global_stat { read_scheduler ; write_scheduler } =
let rs = Moving_average.stat read_scheduler.counter
and ws = Moving_average.stat write_scheduler.counter in
convert ~rs ~ws
let stat { read_conn ; write_conn} =
let rs = Moving_average.stat read_conn.counter
and ws = Moving_average.stat write_conn.counter in
convert ~rs ~ws
let close ?timeout conn =
2017-02-13 16:37:57 +04:00
lwt_log_info "--> close (%d)" conn.id >>= fun () ->
2017-01-14 16:13:59 +04:00
Inttbl.remove conn.sched.connected conn.id ;
Lwt_pipe.close conn.write_queue ;
begin
match timeout with
| None ->
return (Lwt_canceler.cancelation conn.canceler)
| Some timeout ->
Lwt_utils_unix.with_timeout
~canceler:conn.canceler timeout begin fun canceler ->
return (Lwt_canceler.cancelation canceler)
end
end >>=? fun _ ->
2017-01-14 16:13:59 +04:00
conn.write_conn.current_push >>= fun res ->
2017-02-13 16:37:57 +04:00
lwt_log_info "<-- close (%d)" conn.id >>= fun () ->
2017-01-14 16:13:59 +04:00
Lwt.return res
let iter_connection { connected } f =
Inttbl.iter f connected
let shutdown ?timeout st =
lwt_log_info "--> shutdown" >>= fun () ->
2017-01-14 16:13:59 +04:00
st.closed <- true ;
ReadScheduler.shutdown st.read_scheduler >>= fun () ->
Inttbl.fold
2017-02-24 06:50:33 +04:00
(fun _peer_id conn acc -> close ?timeout conn >>= fun _ -> acc)
2017-01-14 16:13:59 +04:00
st.connected
Lwt.return_unit >>= fun () ->
WriteScheduler.shutdown st.write_scheduler >>= fun () ->
2017-02-13 16:37:57 +04:00
lwt_log_info "<-- shutdown" >>= fun () ->
Lwt.return_unit