Shell: mv p2p errors to shell services

Notes:
1. P2p_socket.Rejected -> P2p_errors.Rejected_socket_connection
   to avoid conflict with P2p_pool.Rejected
2. Connection_closed error in P2p_pool unused and redundant with one
   in P2p_io_scheduler
This commit is contained in:
bruno 2018-02-27 16:33:50 +01:00 committed by Benjamin Canou
parent 7de4ed5622
commit 5a37f6acf1
12 changed files with 93 additions and 99 deletions

View File

@ -421,7 +421,7 @@ let faked_network meta_config = {
set_metadata = (fun _ _ -> ()) ;
recv = (fun _ -> Lwt_utils.never_ending) ;
recv_any = (fun () -> Lwt_utils.never_ending) ;
send = (fun _ _ -> fail P2p_pool.Connection_closed) ;
send = (fun _ _ -> fail P2p_errors.Connection_closed) ;
try_send = (fun _ _ -> false) ;
fold_connections = (fun ~init ~f:_ -> init) ;
iter_connections = (fun _f -> ()) ;

View File

@ -36,8 +36,6 @@ module type IO = sig
val close: out_param -> error list -> unit Lwt.t
end
type error += Connection_closed
module Scheduler(IO : IO) = struct
type t = {
@ -122,7 +120,7 @@ module Scheduler(IO : IO) = struct
match msg with
| Error [ Canceled ] ->
worker_loop st
| Error ([Connection_closed |
| Error ([P2p_errors.Connection_closed |
Exn ( Lwt_pipe.Closed |
Unix.Unix_error ((EBADF | ETIMEDOUT), _, _) )]
as err) ->
@ -142,7 +140,7 @@ module Scheduler(IO : IO) = struct
| Ok ()
| Error [ Canceled ] ->
return ()
| Error ([Connection_closed |
| Error ([P2p_errors.Connection_closed |
Exn (Unix.Unix_error (EBADF, _, _) |
Lwt_pipe.Closed)] as err) ->
lwt_debug "Connection closed (push: %d, %s)"
@ -234,12 +232,12 @@ module ReadScheduler = Scheduler(struct
let buf = MBytes.create maxlen in
Lwt_bytes.read fd buf 0 maxlen >>= fun len ->
if len = 0 then
fail Connection_closed
fail P2p_errors.Connection_closed
else
return (MBytes.sub buf 0 len) )
(function
| Unix.Unix_error(Unix.ECONNRESET, _, _) ->
fail Connection_closed
fail P2p_errors.Connection_closed
| exn ->
Lwt.return (error_exn exn))
type out_param = MBytes.t tzresult Lwt_pipe.t
@ -270,7 +268,7 @@ module WriteScheduler = Scheduler(struct
| Unix.Unix_error(Unix.EPIPE, _, _)
| Lwt.Canceled
| End_of_file ->
fail Connection_closed
fail P2p_errors.Connection_closed
| exn ->
Lwt.return (error_exn exn))
let close _p _err = Lwt.return_unit
@ -396,7 +394,7 @@ let register =
let write { write_queue } msg =
Lwt.catch
(fun () -> Lwt_pipe.push write_queue msg >>= return)
(fun _ -> fail Connection_closed)
(fun _ -> fail P2p_errors.Connection_closed)
let write_now { write_queue } msg = Lwt_pipe.push_now write_queue msg
let read_from conn ?pos ?len buf msg =
@ -415,7 +413,7 @@ let read_from conn ?pos ?len buf msg =
Some (MBytes.sub msg read_len (msg_len - read_len)) ;
Ok read_len
| Error _ ->
Error [Connection_closed]
Error [P2p_errors.Connection_closed]
let read_now conn ?pos ?len buf =
match conn.partial_read with
@ -427,7 +425,7 @@ let read_now conn ?pos ?len buf =
Option.map
~f:(read_from conn ?pos ?len buf)
(Lwt_pipe.pop_now conn.read_queue)
with Lwt_pipe.Closed -> Some (Error [Connection_closed])
with Lwt_pipe.Closed -> Some (Error [P2p_errors.Connection_closed])
let read conn ?pos ?len buf =
match conn.partial_read with
@ -439,7 +437,7 @@ let read conn ?pos ?len buf =
(fun () ->
Lwt_pipe.pop conn.read_queue >|= fun msg ->
read_from conn ?pos ?len buf msg)
(fun _ -> fail Connection_closed)
(fun _ -> fail P2p_errors.Connection_closed)
let read_full conn ?pos ?len buf =
let maxlen = MBytes.length buf in

View File

@ -44,8 +44,6 @@ val create:
val register: t -> Lwt_unix.file_descr -> connection
(** [register sched fd] is a [connection] managed by [sched]. *)
type error += Connection_closed
val write: connection -> MBytes.t -> unit tzresult Lwt.t
(** [write conn msg] returns [Ok ()] when [msg] has been added to
[conn]'s write queue, or fail with an error. *)

View File

@ -115,10 +115,10 @@ module Answerer = struct
| Ok (size, Message msg) ->
st.callback.message size msg >>= fun () ->
worker_loop st
| Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] ->
| Ok (_, Disconnect) | Error [P2p_errors.Connection_closed] ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Error [P2p_socket.Decoding_error] ->
| Error [P2p_errors.Decoding_error] ->
(* TODO: Penalize peer... *)
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
@ -281,7 +281,7 @@ let register_point pool ?trusted _source_peer_id (addr, port as point) =
| point_info -> point_info
let may_register_my_id_point pool = function
| [P2p_socket.Myself (addr, Some port)] ->
| [P2p_errors.Myself (addr, Some port)] ->
P2p_point.Table.add pool.my_id_points (addr, port) () ;
P2p_point.Table.remove pool.known_points (addr, port)
| _ -> ()
@ -341,12 +341,12 @@ let read { messages ; conn } =
lwt_debug "%d bytes message popped from queue %a\027[0m"
s P2p_connection.Info.pp (P2p_socket.info conn) >>= fun () ->
return msg)
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
(fun _ (* Closed *) -> fail P2p_errors.Connection_closed)
let is_readable { messages } =
Lwt.catch
(fun () -> Lwt_pipe.values_available messages >>= return)
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
(fun _ (* Closed *) -> fail P2p_errors.Connection_closed)
let write { conn } msg =
P2p_socket.write conn (Message msg)
@ -523,25 +523,17 @@ let pool_stat { io_sched } =
(***************************************************************************)
type error += Rejected of P2p_peer.Id.t
type error += Pending_connection
type error += Connected
type error += Connection_closed = P2p_io_scheduler.Connection_closed
type error += Connection_refused
type error += Closed_network
type error += Too_many_connections
let fail_unless_disconnected_point point_info =
match P2p_point_state.get point_info with
| Disconnected -> return ()
| Requested _ | Accepted _ -> fail Pending_connection
| Running _ -> fail Connected
| Requested _ | Accepted _ -> fail P2p_errors.Pending_connection
| Running _ -> fail P2p_errors.Connected
let fail_unless_disconnected_peer_id peer_info =
match P2p_peer_state.get peer_info with
| Disconnected -> return ()
| Accepted _ -> fail Pending_connection
| Running _ -> fail Connected
| Accepted _ -> fail P2p_errors.Pending_connection
| Running _ -> fail P2p_errors.Connected
let compare_known_point_info p1 p2 =
(* The most-recently disconnected peers are greater. *)
@ -566,7 +558,7 @@ let compare_known_point_info p1 p2 =
let rec connect ~timeout pool point =
fail_unless
(active_connections pool <= pool.config.max_connections)
Too_many_connections >>=? fun () ->
P2p_errors.Too_many_connections >>=? fun () ->
let canceler = Lwt_canceler.create () in
with_timeout ~canceler (Lwt_unix.sleep timeout) begin fun canceler ->
let point_info =
@ -574,7 +566,7 @@ let rec connect ~timeout pool point =
let addr, port as point = P2p_point_state.Info.point point_info in
fail_unless
(not pool.config.closed_network || P2p_point_state.Info.trusted point_info)
Closed_network >>=? fun () ->
P2p_errors.Closed_network >>=? fun () ->
fail_unless_disconnected_point point_info >>=? fun () ->
P2p_point_state.set_requested point_info canceler ;
let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in
@ -591,7 +583,7 @@ let rec connect ~timeout pool point =
Lwt_utils_unix.safe_close fd >>= fun () ->
match err with
| [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] ->
fail Connection_refused
fail P2p_errors.Connection_refused
| err -> Lwt.return (Error err)
end >>=? fun () ->
lwt_debug "connect: %a -> authenticate" P2p_point.Id.pp point >>= fun () ->
@ -724,7 +716,7 @@ and authenticate pool ?point_info canceler fd point =
Option.iter ~f:P2p_point_state.set_disconnected point_info ;
(* FIXME P2p_peer_state.set_disconnected ~requested:true peer_info ; *)
end ;
fail (Rejected info.peer_id)
fail (P2p_errors.Rejected info.peer_id)
end
and create_connection pool p2p_conn id_point point_info peer_info _version =

View File

@ -180,13 +180,6 @@ type ('msg, 'meta) connection
meta-information and data-structures describing a more
fine-grained logical state of the connection. *)
type error += Pending_connection
type error += Connected
type error += Connection_refused
type error += Rejected of P2p_peer.Id.t
type error += Too_many_connections
type error += Closed_network
val connect:
timeout:float ->
('msg, 'meta) pool -> P2p_point.Id.t ->
@ -235,8 +228,6 @@ val on_new_connection:
(** {1 I/O on connections} *)
type error += Connection_closed
val read: ('msg, 'meta) connection -> 'msg tzresult Lwt.t
(** [read conn] returns a message popped from [conn]'s app message
queue, or fails with [Connection_closed]. *)

View File

@ -19,16 +19,6 @@
include Logging.Make(struct let name = "p2p.connection" end)
type error += Decipher_error
type error += Invalid_message_size
type error += Encoding_error
type error += Rejected
type error += Decoding_error
type error += Myself of P2p_connection.Id.t
type error += Not_enough_proof_of_work of P2p_peer.Id.t
type error += Invalid_auth
type error += Invalid_chunks_size of { value: int ; min: int ; max: int }
module Crypto = struct
let bufsize = 1 lsl 16 - 1
@ -44,7 +34,7 @@ module Crypto = struct
let write_chunk fd cryptobox_data msg =
let msglen = MBytes.length msg in
fail_unless
(msglen <= max_content_length) Invalid_message_size >>=? fun () ->
(msglen <= max_content_length) P2p_errors.Invalid_message_size >>=? fun () ->
let buf = MBytes.init (msglen + Crypto_box.zerobytes) '\x00' in
MBytes.blit msg 0 buf Crypto_box.zerobytes msglen ;
let local_nonce = cryptobox_data.local_nonce in
@ -72,7 +62,7 @@ module Crypto = struct
cryptobox_data.channel_key remote_nonce buf
with
| false ->
fail Decipher_error
fail P2p_errors.Decipher_error
| true ->
return (MBytes.sub buf Crypto_box.zerobytes
(encrypted_length - Crypto_box.boxzerobytes))
@ -84,7 +74,7 @@ let check_binary_chunks_size size =
fail_unless
(value > 0 &&
value <= Crypto.max_content_length)
(Invalid_chunks_size
(P2p_errors.Invalid_chunks_size
{ value = size ;
min = Crypto.(header_length + Crypto_box.boxzerobytes + 1) ;
max = Crypto.bufsize ;
@ -125,15 +115,15 @@ module Connection_message = struct
Data_encoding.Binary.length encoding message in
fail_unless
(encoded_message_len < 1 lsl (Crypto.header_length * 8))
Encoding_error >>=? fun () ->
P2p_errors.Encoding_error >>=? fun () ->
let len = Crypto.header_length + encoded_message_len in
let buf = MBytes.create len in
match Data_encoding.Binary.write
encoding message buf Crypto.header_length with
| None ->
fail Encoding_error
fail P2p_errors.Encoding_error
| Some last ->
fail_unless (last = len) Encoding_error >>=? fun () ->
fail_unless (last = len) P2p_errors.Encoding_error >>=? fun () ->
MBytes.set_int16 buf 0 encoded_message_len ;
P2p_io_scheduler.write fd buf
@ -146,10 +136,10 @@ module Connection_message = struct
P2p_io_scheduler.read_full ~len fd buf >>=? fun () ->
match Data_encoding.Binary.read encoding buf 0 len with
| None ->
fail Decoding_error
fail P2p_errors.Decoding_error
| Some (read_len, message) ->
if read_len <> len then
fail Decoding_error
fail P2p_errors.Decoding_error
else
return message
@ -201,11 +191,11 @@ let authenticate
let remote_peer_id = Crypto_box.hash msg.public_key in
fail_unless
(remote_peer_id <> identity.P2p_identity.peer_id)
(Myself id_point) >>=? fun () ->
(P2p_errors.Myself id_point) >>=? fun () ->
fail_unless
(Crypto_box.check_proof_of_work
msg.public_key msg.proof_of_work_stamp proof_of_work_target)
(Not_enough_proof_of_work remote_peer_id) >>=? fun () ->
(P2p_errors.Not_enough_proof_of_work remote_peer_id) >>=? fun () ->
let channel_key =
Crypto_box.precompute identity.P2p_identity.secret_key msg.public_key in
let info =
@ -266,7 +256,7 @@ module Reader = struct
match msg with
| None ->
protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
Lwt_pipe.push st.messages (Error [P2p_errors.Decoding_error]) >>= fun () ->
return None
end
| Some (msg, size, rem_mbytes) ->
@ -341,7 +331,7 @@ module Writer = struct
let encode_message st msg =
try ok (Data_encoding.Binary.to_bytes_list st.binary_chunks_size st.encoding msg)
with _ -> error Encoding_error
with _ -> error P2p_errors.Encoding_error
let rec worker_loop st =
Lwt_unix.yield () >>= fun () ->
@ -368,13 +358,13 @@ module Writer = struct
Option.iter wakener
~f:(fun u ->
Lwt.wakeup_later u
(Error [P2p_io_scheduler.Connection_closed])) ;
(Error [P2p_errors.Connection_closed])) ;
match err with
| [ Canceled | Exn Lwt_pipe.Closed ] ->
lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt.return_unit
| [ P2p_io_scheduler.Connection_closed ] ->
| [ P2p_errors.Connection_closed ] ->
lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
@ -459,11 +449,11 @@ let accept
end ~on_error:begin fun err ->
P2p_io_scheduler.close fd >>= fun _ ->
match err with
| [ P2p_io_scheduler.Connection_closed ] -> fail Rejected
| [ Decipher_error ] -> fail Invalid_auth
| [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection
| [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth
| err -> Lwt.return (Error err)
end >>=? fun accepted ->
fail_unless accepted Rejected >>=? fun () ->
fail_unless accepted P2p_errors.Rejected_socket_connection >>=? fun () ->
let canceler = Lwt_canceler.create () in
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
let reader =
@ -482,11 +472,11 @@ let accept
let catch_closed_pipe f =
Lwt.catch f begin function
| Lwt_pipe.Closed -> fail P2p_io_scheduler.Connection_closed
| Lwt_pipe.Closed -> fail P2p_errors.Connection_closed
| exn -> fail (Exn exn)
end >>= function
| Error [Exn Lwt_pipe.Closed] ->
fail P2p_io_scheduler.Connection_closed
fail P2p_errors.Connection_closed
| Error _ | Ok _ as v -> Lwt.return v
let pp_json encoding ppf msg =
@ -516,7 +506,7 @@ let write_now { writer ; conn } msg =
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
Writer.encode_message writer msg >>? fun buf ->
try Ok (Lwt_pipe.push_now writer.messages (buf, None))
with Lwt_pipe.Closed -> Error [P2p_io_scheduler.Connection_closed]
with Lwt_pipe.Closed -> Error [P2p_errors.Connection_closed]
let rec split_bytes size bytes =
if MBytes.length bytes <= size then
@ -545,7 +535,7 @@ let read { reader } =
end
let read_now { reader } =
try Lwt_pipe.pop_now reader.messages
with Lwt_pipe.Closed -> Some (Error [P2p_io_scheduler.Connection_closed])
with Lwt_pipe.Closed -> Some (Error [P2p_errors.Connection_closed])
let stat { conn = { fd } } = P2p_io_scheduler.stat fd

View File

@ -17,16 +17,6 @@
(** {1 Types} *)
type error += Decipher_error
type error += Invalid_message_size
type error += Encoding_error
type error += Decoding_error
type error += Rejected
type error += Myself of P2p_connection.Id.t
type error += Not_enough_proof_of_work of P2p_peer.Id.t
type error += Invalid_auth
type error += Invalid_chunks_size of { value: int ; min: int ; max: int }
type authenticated_fd
(** Type of a connection that successfully passed the authentication
phase, but has not been accepted yet. *)

View File

@ -6,6 +6,7 @@
test_p2p_io_scheduler))
(libraries (tezos-base
tezos-stdlib-unix
tezos-shell-services
tezos-p2p
alcotest-lwt))
(flags (:standard -w -9-32
@ -13,6 +14,7 @@
-safe-string
-open Tezos_base__TzPervasives
-open Tezos_stdlib_unix
-open Tezos_shell_services
-open Tezos_p2p))))
(alias

View File

@ -71,7 +71,7 @@ let receive conn =
let rec loop () =
P2p_io_scheduler.read conn buf >>= function
| Ok _ -> loop ()
| Error [P2p_io_scheduler.Connection_closed] ->
| Error [P2p_errors.Connection_closed] ->
Lwt.return ()
| Error err -> Lwt.fail (Error err)
in

View File

@ -112,31 +112,31 @@ module Simple = struct
let rec connect ~timeout pool point =
lwt_log_info "Connect to %a" P2p_point.Id.pp point >>= fun () ->
P2p_pool.connect pool point ~timeout >>= function
| Error [P2p_pool.Connected] -> begin
| Error [P2p_errors.Connected] -> begin
match P2p_pool.Connection.find_by_point pool point with
| Some conn -> return conn
| None -> failwith "Woops..."
end
| Error ([ P2p_pool.Connection_refused
| P2p_pool.Pending_connection
| P2p_socket.Rejected
| Error ([ P2p_errors.Connection_refused
| P2p_errors.Pending_connection
| P2p_errors.Rejected_socket_connection
| Canceled
| Timeout
| P2p_pool.Rejected _ as err ]) ->
| P2p_errors.Rejected _ as err ]) ->
lwt_log_info "Connection to %a failed (%a)"
P2p_point.Id.pp point
(fun ppf err -> match err with
| P2p_pool.Connection_refused ->
| P2p_errors.Connection_refused ->
Format.fprintf ppf "connection refused"
| P2p_pool.Pending_connection ->
| P2p_errors.Pending_connection ->
Format.fprintf ppf "pending connection"
| P2p_socket.Rejected ->
| P2p_errors.Rejected_socket_connection ->
Format.fprintf ppf "rejected"
| Canceled ->
Format.fprintf ppf "canceled"
| Timeout ->
Format.fprintf ppf "timeout"
| P2p_pool.Rejected peer ->
| P2p_errors.Rejected peer ->
Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer
| _ -> assert false) err >>= fun () ->
Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () ->
@ -219,7 +219,7 @@ end
module Garbled = struct
let is_connection_closed = function
| Error ((Write | Read) :: P2p_io_scheduler.Connection_closed :: _) -> true
| Error ((Write | Read) :: P2p_errors.Connection_closed :: _) -> true
| Ok _ -> false
| Error err ->
log_info "Unexpected error: %a" pp_print_error err ;

View File

@ -116,14 +116,14 @@ let connect sched addr port id =
return auth_fd
let is_connection_closed = function
| Error [P2p_io_scheduler.Connection_closed] -> true
| Error [P2p_errors.Connection_closed] -> true
| Ok _ -> false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
let is_decoding_error = function
| Error [P2p_socket.Decoding_error] -> true
| Error [P2p_errors.Decoding_error] -> true
| Ok _ -> false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
@ -156,7 +156,7 @@ module Kick = struct
let encoding = Data_encoding.bytes
let is_rejected = function
| Error [P2p_socket.Rejected] -> true
| Error [P2p_errors.Rejected_socket_connection] -> true
| Ok _ -> false
| Error err ->
log_notice "Error: %a" pp_print_error err ;

View File

@ -0,0 +1,33 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(************************ p2p io scheduler ********************************)
type error += Connection_closed
(***************************** p2p socket *********************************)
type error += Decipher_error
type error += Invalid_message_size
type error += Encoding_error
type error += Rejected_socket_connection
type error += Decoding_error
type error += Myself of P2p_connection.Id.t
type error += Not_enough_proof_of_work of P2p_peer.Id.t
type error += Invalid_auth
type error += Invalid_chunks_size of { value: int ; min: int ; max: int }
(***************************** p2p pool ***********************************)
type error += Pending_connection
type error += Connected
type error += Connection_refused
type error += Rejected of P2p_peer.Id.t
type error += Too_many_connections
type error += Closed_network