P2P: recv: return size read
This commit is contained in:
parent
2b27a1ffbe
commit
92f78b29e4
@ -402,12 +402,10 @@ module Make (P: PARAMS) = struct
|
||||
(* a non exception-based cancelation mechanism *)
|
||||
let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
|
||||
(* a cancelable encrypted reception *)
|
||||
let recv ?received ?uncrypt buf =
|
||||
let recv ?uncrypt buf =
|
||||
Lwt.pick [ recv_msg ?uncrypt socket buf ;
|
||||
(cancelation () >>= fun () -> Error_monad.fail Canceled) ]
|
||||
>>=? fun (size, message) ->
|
||||
Utils.iter_option received ~f:(fun r -> r := !r + size) ;
|
||||
return message in
|
||||
in
|
||||
(* First step: send and receive credentials, makes no difference
|
||||
whether we're trying to connect to a peer or checking an incoming
|
||||
connection, both parties must first present themselves. *)
|
||||
@ -434,53 +432,53 @@ module Make (P: PARAMS) = struct
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port
|
||||
Error_monad.pp_print_error err ;
|
||||
cancel ()
|
||||
| Ok (Connect { gid; port = listening_port; versions ;
|
||||
public_key ; proof_of_work ; message_nonce }) ->
|
||||
debug "(%a) connection requested from %a @@ %a:%d"
|
||||
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
|
||||
let work_proved =
|
||||
Crypto_box.check_proof_of_work
|
||||
public_key proof_of_work Crypto_box.default_target in
|
||||
if not work_proved then begin
|
||||
debug "connection rejected (invalid proof of work)" ;
|
||||
| Ok (_, (Connect { gid; port = listening_port; versions ;
|
||||
public_key ; proof_of_work ; message_nonce })) ->
|
||||
debug "(%a) connection requested from %a @@ %a:%d"
|
||||
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
|
||||
let work_proved =
|
||||
Crypto_box.check_proof_of_work
|
||||
public_key proof_of_work Crypto_box.default_target in
|
||||
if not work_proved then begin
|
||||
debug "connection rejected (invalid proof of work)" ;
|
||||
cancel ()
|
||||
end else begin
|
||||
match common_version P.supported_versions versions with
|
||||
| None ->
|
||||
debug
|
||||
"(%a) connection rejected (incompatible versions) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| Some version ->
|
||||
if config.closed_network then
|
||||
match listening_port with
|
||||
| Some port when white_listed (addr, port) ->
|
||||
connected
|
||||
buf local_nonce version gid
|
||||
public_key message_nonce listening_port
|
||||
| Some port ->
|
||||
debug
|
||||
"(%a) connection rejected (out of the closed network) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| None ->
|
||||
debug
|
||||
"(%a) connection rejected (out of the closed network) from %a:unknown"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr ;
|
||||
cancel ()
|
||||
else
|
||||
connected
|
||||
buf local_nonce version gid
|
||||
public_key message_nonce listening_port
|
||||
end
|
||||
| Ok (_, Disconnect) ->
|
||||
debug "(%a) connection rejected (closed by peer or timeout) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| _ ->
|
||||
debug "(%a) connection rejected (bad connection request) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
end else begin
|
||||
match common_version P.supported_versions versions with
|
||||
| None ->
|
||||
debug
|
||||
"(%a) connection rejected (incompatible versions) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| Some version ->
|
||||
if config.closed_network then
|
||||
match listening_port with
|
||||
| Some port when white_listed (addr, port) ->
|
||||
connected
|
||||
buf local_nonce version gid
|
||||
public_key message_nonce listening_port
|
||||
| Some port ->
|
||||
debug
|
||||
"(%a) connection rejected (out of the closed network) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| None ->
|
||||
debug
|
||||
"(%a) connection rejected (out of the closed network) from %a:unknown"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr ;
|
||||
cancel ()
|
||||
else
|
||||
connected
|
||||
buf local_nonce version gid
|
||||
public_key message_nonce listening_port
|
||||
end
|
||||
| Ok Disconnect ->
|
||||
debug "(%a) connection rejected (closed by peer or timeout) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| Ok _ ->
|
||||
debug "(%a) connection rejected (bad connection request) from %a:%d"
|
||||
pp_gid my_gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
|
||||
(* Them we can build the net object and launch the worker. *)
|
||||
and connected buf local_nonce version gid public_key nonce listening_port =
|
||||
@ -535,24 +533,28 @@ module Make (P: PARAMS) = struct
|
||||
| Some _ as res -> res in
|
||||
(* The message reception loop. *)
|
||||
let rec receiver () =
|
||||
recv ~received ~uncrypt buf >>= function
|
||||
recv ~uncrypt buf >>= function
|
||||
| Error err ->
|
||||
debug "(%a) error receiving: %a"
|
||||
pp_gid my_gid Error_monad.pp_print_error err ;
|
||||
cancel ()
|
||||
| Ok Connect _
|
||||
| Ok Disconnect ->
|
||||
debug "(%a) disconnected (by peer) %a @@ %a:%d"
|
||||
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| Ok Bootstrap -> Lwt_pipe.push reader (Bootstrap peer) >>= receiver
|
||||
| Ok Advertise peers -> Lwt_pipe.push reader (Peers peers) >>= receiver
|
||||
| Ok Message msg -> Lwt_pipe.push reader (Recv (peer, msg)) >>= receiver
|
||||
| Ok (size, msg) ->
|
||||
received := !received + size;
|
||||
match msg with
|
||||
| Connect _
|
||||
| Disconnect ->
|
||||
debug "(%a) disconnected (by peer) %a @@ %a:%d"
|
||||
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
|
||||
cancel ()
|
||||
| Bootstrap -> Lwt_pipe.push reader (Bootstrap peer) >>= receiver
|
||||
| Advertise peers -> Lwt_pipe.push reader (Peers peers) >>= receiver
|
||||
| Message msg -> Lwt_pipe.push reader (Recv (peer, msg)) >>= receiver
|
||||
in
|
||||
let rec sender () =
|
||||
Lwt_pipe.pop peer.writer >>= fun msg ->
|
||||
send_msg ~crypt socket buf msg >>= function
|
||||
| Ok _nb_sent ->
|
||||
| Ok size ->
|
||||
sent := !sent + size;
|
||||
sender ()
|
||||
| Error err ->
|
||||
debug "(%a) error sending to %a: %a"
|
||||
|
Loading…
Reference in New Issue
Block a user