diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 5051819b9..9cbb372d3 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -13,6 +13,13 @@ module LC = Lwt_condition open Lwt.Infix open Logging.Net +type error += Encoding_error +type error += Message_too_big +type error += Write_would_block +type error += Decipher_error +type error += Canceled +type error += Timeout + (* public types *) type addr = Ipaddr.t type port = int @@ -195,51 +202,46 @@ module Make (P: PARAMS) = struct match uncrypt buf with | None -> (* TODO track invalid message *) - Lwt.return Disconnect + Error_monad.fail Decipher_error | Some buf -> match Data_encoding.Binary.of_bytes msg_encoding buf with | None -> (* TODO track invalid message *) - Lwt.return Disconnect + Error_monad.fail Encoding_error | Some msg -> - Lwt.return msg + Error_monad.return (len, msg) end - (function - | Unix.Unix_error _ | End_of_file -> Lwt.return Disconnect - | e -> Lwt.fail e) + (fun exn -> Lwt.return @@ Error_monad.error_exn exn) (* send a message over a TCP socket *) let send_msg ?crypt fd buf msg = Lwt.catch begin fun () -> match Data_encoding.Binary.write msg_encoding msg buf hdrlen with - | None -> Lwt.return_false + | None -> Error_monad.fail Encoding_error | Some len -> match crypt with | None -> - if len > maxlen then - Lwt.return_false + if len > maxlen then Error_monad.fail Message_too_big else begin EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; (* TODO timeout write ??? *) Lwt_utils.write_mbytes ~len fd buf >>= fun () -> - Lwt.return_true + Error_monad.return len end | Some crypt -> let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in let len = MBytes.length encbuf in - if len > maxlen then - Lwt.return_false + if len > maxlen then Error_monad.fail Message_too_big else begin let lenbuf = MBytes.create 2 in EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; Lwt_utils.write_mbytes fd lenbuf >>= fun () -> Lwt_utils.write_mbytes fd encbuf >>= fun () -> - Lwt.return_true + Error_monad.return len end end - (function - | Unix.Unix_error _ | End_of_file -> Lwt.return_false - | e -> Lwt.fail e) + (fun exn -> Lwt.return @@ Error_monad.error_exn exn) + (* The (internal) type of network events, those dispatched from peer workers to the net and others internal to net workers. *) @@ -393,9 +395,11 @@ module Make (P: PARAMS) = struct (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) - let recv ~uncrypt buf = - Lwt.pick [ recv_msg ~uncrypt socket buf ; - (cancelation () >>= fun () -> Lwt.return Disconnect) ] in + let recv ?uncrypt buf = + Lwt.pick [ recv_msg ?uncrypt socket buf ; + (cancelation () >>= fun () -> Error_monad.fail Canceled) ] + >>=? fun (_size, message) -> + return message in (* First step: send and receive credentials, makes no difference whether we're trying to connect to a peer or checking an incoming connection, both parties must first present themselves. *) @@ -410,10 +414,14 @@ module Make (P: PARAMS) = struct versions = P.supported_versions }) >>= fun _ -> Lwt.pick [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> - Lwt.return Disconnect ) ; - recv_msg socket buf ] >>= function - | Connect { gid; port = listening_port; versions ; public_key ; - proof_of_work ; message_nonce } -> + Error_monad.fail Timeout ) ; + recv buf ] >>= function + | Error err -> + debug "(%a) error receiving from %a:%d: %a" + pp_gid my_gid Ipaddr.pp_hum addr port Error_monad.pp_print_error err ; + cancel () + | Ok (Connect { gid; port = listening_port; versions ; + public_key ; proof_of_work ; message_nonce }) -> debug "(%a) connection requested from %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; let work_proved = @@ -451,18 +459,18 @@ module Make (P: PARAMS) = struct buf local_nonce version gid public_key message_nonce listening_port end - | Advertise peers -> + | Ok (Advertise peers) -> (* alternatively, one can refuse a connection but reply with some peers, so we accept this info *) debug "(%a) new peers received from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; push (Peers peers) ; cancel () - | Disconnect -> + | Ok Disconnect -> debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () - | _ -> + | Ok _ -> debug "(%a) connection rejected (bad connection request) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () @@ -496,17 +504,19 @@ module Make (P: PARAMS) = struct | Some _ as res -> res in (* The message reception loop. *) let rec receiver () = - recv ~uncrypt buf >>= fun message -> - last := Unix.gettimeofday () ; - match message with - | Connect _ - | Disconnect -> + recv ~uncrypt buf >>= function + | Error err -> + debug "(%a) error receiving: %a" + pp_gid my_gid Error_monad.pp_print_error err ; + cancel () + | Ok Connect _ + | Ok Disconnect -> debug "(%a) disconnected (by peer) %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; cancel () - | Bootstrap -> push (Bootstrap peer) ; receiver () - | Advertise peers -> push (Peers peers) ; receiver () - | Message msg -> push (Recv (peer, msg)) ; receiver () + | Ok Bootstrap -> push (Bootstrap peer) ; receiver () + | Ok Advertise peers -> push (Peers peers) ; receiver () + | Ok Message msg -> push (Recv (peer, msg)) ; receiver () in (* Events for the main worker *) push (Connected peer) ; @@ -1176,6 +1186,7 @@ module Make (P: PARAMS) = struct let discovery_answerer = let buf = MBytes.create 0x100_000 in match config.discovery_port with + | None -> Lwt.return_unit | Some disco_port -> let answerer () = discovery_answerer @@ -1198,8 +1209,7 @@ module Make (P: PARAMS) = struct else LU.close socket in Lwt_utils.worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) - answerer cancel - | _ -> Lwt.return_unit in + answerer cancel in let discovery_sender = match config.incoming_port, config.discovery_port with | Some inco_port, Some disco_port ->