From fdff344989c0278aca3ebef138372aa874d931ae Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 21:35:14 +0100 Subject: [PATCH] Shell: minor cosmetics in `p2p.ml` --- src/node/net/p2p.ml | 518 +++++++++++++++++++++++++------------------- 1 file changed, 294 insertions(+), 224 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 71480b985..777e769af 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -9,8 +9,8 @@ module LU = Lwt_unix module LC = Lwt_condition -open Lwt -open Lwt_utils + +open Lwt.Infix open Logging.Net (* public types *) @@ -150,18 +150,22 @@ module Make (P: PARAMS) = struct (obj6 (req "gid" (Fixed.string gid_length)) (req "port" uint16) - (req "pubKey" Crypto_box.public_key_encoding) + (req "pubkey" Crypto_box.public_key_encoding) (req "proof_of_work" Crypto_box.nonce_encoding) (req "message_nonce" Crypto_box.nonce_encoding) (req "versions" (Variable.list version_encoding))) (function - | Connect { gid ; port ; versions ; public_key ; proof_of_work; message_nonce } -> + | Connect { gid ; port ; public_key ; + proof_of_work ; message_nonce ; versions } -> let port = match port with None -> 0 | Some port -> port in - Some (gid, port, public_key, proof_of_work, message_nonce, versions) + Some (gid, port, public_key, + proof_of_work, message_nonce, versions) | _ -> None) - (fun (gid, port, public_key, proof_of_work, message_nonce, versions) -> - let port = if port = 0 then None else Some port in - Connect { gid ; port ; versions ; public_key ; proof_of_work; message_nonce }); + (fun (gid, port, public_key, + proof_of_work, message_nonce, versions) -> + let port = if port = 0 then None else Some port in + Connect { gid ; port ; versions ; + public_key ; proof_of_work ; message_nonce }); case ~tag:0x01 null (function Disconnect -> Some () | _ -> None) (fun () -> Disconnect); @@ -183,67 +187,78 @@ module Make (P: PARAMS) = struct (* read a message from a TCP socket *) let recv_msg ?(uncrypt = (fun buf -> Some buf)) fd buf = - catch - (fun () -> - assert (MBytes.length buf >= 2 lsl 16) ; - Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () -> - let len = EndianBigstring.BigEndian.get_uint16 buf 0 in - (* TODO timeout read ??? *) - Lwt_utils.read_mbytes ~len fd buf >>= fun () -> - let buf = MBytes.sub buf 0 len in - match uncrypt buf with - | None -> - (* TODO track invalid message *) - return Disconnect - | Some buf -> - match Data_encoding.Binary.of_bytes msg_encoding buf with - | None -> - (* TODO track invalid message *) - return Disconnect - | Some msg -> - Lwt.return msg) + Lwt.catch begin fun () -> + assert (MBytes.length buf >= 2 lsl 16) ; + Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () -> + let len = EndianBigstring.BigEndian.get_uint16 buf 0 in + (* TODO timeout read ??? *) + Lwt_utils.read_mbytes ~len fd buf >>= fun () -> + let buf = MBytes.sub buf 0 len in + match uncrypt buf with + | None -> + (* TODO track invalid message *) + Lwt.return Disconnect + | Some buf -> + match Data_encoding.Binary.of_bytes msg_encoding buf with + | None -> + (* TODO track invalid message *) + Lwt.return Disconnect + | Some msg -> + Lwt.return msg + end (function - | Unix.Unix_error _ | End_of_file -> return Disconnect - | e -> fail e) + | Unix.Unix_error _ | End_of_file -> Lwt.return Disconnect + | e -> Lwt.fail e) (* send a message over a TCP socket *) let send_msg ?crypt fd buf msg = - catch - (fun () -> - match Data_encoding.Binary.write msg_encoding msg buf hdrlen with - | None -> return_false - | Some len -> - match crypt with - | None -> - if len > maxlen then - return_false - else begin - EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; - (* TODO timeout write ??? *) - Lwt_utils.write_mbytes ~len fd buf >>= fun () -> - return true - end - | Some crypt -> - let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in - let len = MBytes.length encbuf in - if len > maxlen then - return_false - else begin - let lenbuf = MBytes.create 2 in - EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; - Lwt_utils.write_mbytes fd lenbuf >>= fun () -> - Lwt_utils.write_mbytes fd encbuf >>= fun () -> - return true - end) + Lwt.catch begin fun () -> + match Data_encoding.Binary.write msg_encoding msg buf hdrlen with + | None -> Lwt.return_false + | Some len -> + match crypt with + | None -> + if len > maxlen then + Lwt.return_false + else begin + EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; + (* TODO timeout write ??? *) + Lwt_utils.write_mbytes ~len fd buf >>= fun () -> + Lwt.return_true + end + | Some crypt -> + let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in + let len = MBytes.length encbuf in + if len > maxlen then + Lwt.return_false + else begin + let lenbuf = MBytes.create 2 in + EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; + Lwt_utils.write_mbytes fd lenbuf >>= fun () -> + Lwt_utils.write_mbytes fd encbuf >>= fun () -> + Lwt.return_true + end + end (function - | Unix.Unix_error _ | End_of_file -> return_false - | e -> fail e) + | Unix.Unix_error _ | End_of_file -> Lwt.return_false + | e -> Lwt.fail e) + + (* The (internal) type of network events, those dispatched from peer + workers to the net and others internal to net workers. *) + type event = + | Disconnected of peer + | Bootstrap of peer + | Recv of peer * P.msg + | Peers of point list + | Contact of point * LU.file_descr + | Connected of peer + | Shutdown (* A peer handle, as a record-encoded object, abstract from the outside world. A hidden Lwt worker is associated to a peer at its creation and is killed using the disconnect callback by net workers (on shutdown of during maintenance). *) - type peer = { + and peer = { gid : gid ; public_key : Crypto_box.public_key ; point : point ; @@ -281,17 +296,6 @@ module Make (P: PARAMS) = struct get_metadata : gid -> P.metadata option ; } - (* The (internal) type of network events, those dispatched from peer - workers to the net and others internal to net workers. *) - type event = - | Disconnected of peer - | Bootstrap of peer - | Recv of peer * P.msg - | Peers of point list - | Contact of point * LU.file_descr - | Connected of peer - | Shutdown - (* Run-time point-or-gid indexed storage, one point is bound to at most one gid, which is the invariant we want to keep both for the connected peers table and the known peers one *) @@ -389,11 +393,11 @@ module Make (P: PARAMS) = struct config limits my_gid my_public_key my_secret_key my_proof_of_work socket (addr, port) push white_listed = (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = canceler () in + let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) let recv ~uncrypt buf = - pick [ recv_msg ~uncrypt socket buf ; - (cancelation () >>= fun () -> return Disconnect) ] in + Lwt.pick [ recv_msg ~uncrypt socket buf ; + (cancelation () >>= fun () -> Lwt.return Disconnect) ] in (* First step: send and receive credentials, makes no difference whether we're trying to connect to a peer or checking an incoming connection, both parties must first present themselves. *) @@ -406,41 +410,49 @@ module Make (P: PARAMS) = struct message_nonce = local_nonce ; port = config.incoming_port ; versions = P.supported_versions }) >>= fun _ -> - pick [ (LU.sleep limits.peer_answer_timeout >>= fun () -> return Disconnect) ; - recv_msg socket buf ] >>= function - | Connect { gid; port = listening_port; versions ; public_key ; proof_of_work ; message_nonce } -> + Lwt.pick + [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> + Lwt.return Disconnect ) ; + recv_msg socket buf ] >>= function + | Connect { gid; port = listening_port; versions ; public_key ; + proof_of_work ; message_nonce } -> debug "(%a) connection requested from %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; let work_proved = Crypto_box.check_proof_of_work public_key proof_of_work Crypto_box.default_target in - if not work_proved then - begin - debug "connection rejected (invalid proof of work)"; + if not work_proved then begin + debug "connection rejected (invalid proof of work)" ; cancel () - end - else - begin match common_version P.supported_versions versions with + end else begin + match common_version P.supported_versions versions with | None -> - debug "(%a) connection rejected (incompatible versions) from %a:%d" + debug + "(%a) connection rejected (incompatible versions) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () | Some version -> if config.closed_network then match listening_port with | Some port when white_listed (addr, port) -> - connected buf local_nonce version gid public_key message_nonce listening_port + connected + buf local_nonce version gid + public_key message_nonce listening_port | Some port -> - debug "(%a) connection rejected (out of the closed network) from %a:%d" + debug + "(%a) connection rejected (out of the closed network) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () | None -> - debug "(%a) connection rejected (out of the closed network) from %a:unknown" + debug + "(%a) connection rejected (out of the closed network) from %a:unknown" pp_gid my_gid Ipaddr.pp_hum addr ; cancel () else - connected buf local_nonce version gid public_key message_nonce listening_port - end + connected + buf local_nonce version gid + public_key message_nonce listening_port + end | Advertise peers -> (* alternatively, one can refuse a connection but reply with some peers, so we accept this info *) @@ -472,7 +484,7 @@ module Make (P: PARAMS) = struct let crypt buf = let nonce = get_nonce remote_nonce in Crypto_box.box my_secret_key public_key buf nonce in - let send p = send_msg ~crypt socket buf p >>= fun _ -> return () in + let send p = send_msg ~crypt socket buf p >>= fun _ -> Lwt.return_unit in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; listening_port ; version ; last_seen ; disconnect ; send } in @@ -500,7 +512,7 @@ module Make (P: PARAMS) = struct in (* Events for the main worker *) push (Connected peer) ; - on_cancel (fun () -> push (Disconnected peer) ; return ()) ; + on_cancel (fun () -> push (Disconnected peer) ; Lwt.return_unit) ; (* Launch the worker *) receiver () in @@ -508,12 +520,13 @@ module Make (P: PARAMS) = struct on_cancel (fun () -> (* send_msg ~crypt socket buf Disconnect >>= fun _ -> *) LU.close socket >>= fun _ -> - return ()) ; + Lwt.return_unit) ; let worker_name = Format.asprintf "(%a) connection handler for %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port in - ignore (worker ~safe:true worker_name ~run:(fun () -> connect buf) ~cancel) ; + ignore (Lwt_utils.worker worker_name + ~safe:true ~run:(fun () -> connect buf) ~cancel) ; (* return the canceler *) cancel @@ -523,7 +536,10 @@ module Make (P: PARAMS) = struct let open Data_encoding in splitted ~json: - (conv Ipaddr.to_string (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) string) + (conv + Ipaddr.to_string + (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) + string) ~binary: (union ~tag_size:`Uint8 [ case ~tag:4 @@ -619,50 +635,50 @@ module Make (P: PARAMS) = struct callback to fill the answers and returns a canceler function *) let discovery_answerer my_gid disco_port cancelation callback = (* init a UDP listening socket on the broadcast canal *) - catch - (fun () -> - let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.(setsockopt main_socket SO_BROADCAST true) ; - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; - return (Some main_socket)) + Lwt.catch begin fun () -> + let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in + LU.(setsockopt main_socket SO_BROADCAST true) ; + LU.(setsockopt main_socket SO_REUSEADDR true) ; + LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; + Lwt.return (Some main_socket) + end (fun exn -> debug "(%a) will not listen to discovery requests (%s)" pp_gid my_gid (string_of_unix_exn exn) ; - return None) >>= function - | None -> return () + Lwt.return_none) >>= function + | None -> Lwt.return_unit | Some main_socket -> (* the answering function *) let rec step () = let buffer = discovery_message my_gid 0 in let len = MBytes.length buffer in - pick [ (cancelation () >>= fun () -> return None) ; - (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> - return (Some r)) ] >>= function - | Some (len', LU.ADDR_INET (addr, _)) -> - if len' <> len then - step () (* drop bytes, better luck next time ! *) - else - answerable_discovery_message - (Data_encoding.Binary.of_bytes - discovery_message_encoding buffer) - my_gid - (fun _ port -> - catch - (fun () -> - let ipaddr = Ipaddr_unix.of_inet_addr addr in - let ipaddr = Ipaddr.(match ipaddr with V4 addr -> V6 (v6_of_v4 addr) | _ -> ipaddr) in - let addr = Ipaddr_unix.to_inet_addr ipaddr in - let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> - callback ipaddr port socket >>= fun () -> - return ()) - (fun _ -> (* ignore errors *) return ()) >>= fun () -> - step ()) - step - | Some (_, _) -> - step () - | None -> return () + Lwt.pick + [ (cancelation () >>= fun () -> Lwt.return_none) ; + (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> + Lwt.return (Some r)) ] >>= function + | None -> Lwt.return_unit + | Some (len', LU.ADDR_INET (addr, _)) when len' = len -> + answerable_discovery_message + (Data_encoding.Binary.of_bytes + discovery_message_encoding buffer) + my_gid + (fun _ port -> + Lwt.catch begin fun () -> + let ipaddr = + let open Ipaddr in + match Ipaddr_unix.of_inet_addr addr with + | V4 addr -> V6 (v6_of_v4 addr) + | V6 _ as addr -> addr in + let addr = Ipaddr_unix.to_inet_addr ipaddr in + let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in + LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> + callback ipaddr port socket >>= fun () -> + Lwt.return_unit + end + (fun _ -> (* ignore errors *) Lwt.return_unit) >>= fun () -> + step ()) + step + | Some _ -> step () in step () (* Sends dicover messages into space in an exponentially delayed loop, @@ -670,24 +686,28 @@ module Make (P: PARAMS) = struct let discovery_sender my_gid disco_port inco_port cancelation restart = let msg = discovery_message my_gid inco_port in let rec loop delay n = - catch - (fun () -> - let socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.setsockopt socket LU.SO_BROADCAST true ; - LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () -> - Lwt_utils.write_mbytes socket msg >>= fun _ -> - LU.close socket) + Lwt.catch begin fun () -> + let socket = LU.(socket PF_INET SOCK_DGRAM 0) in + LU.setsockopt socket LU.SO_BROADCAST true ; + let broadcast_ipv4 = Unix.inet_addr_of_string "255.255.255.255" in + LU.connect socket + LU.(ADDR_INET (broadcast_ipv4, disco_port)) >>= fun () -> + Lwt_utils.write_mbytes socket msg >>= fun _ -> + LU.close socket + end (fun _ -> debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; - return ()) >>= fun () -> - pick [ (LU.sleep delay >>= fun () -> return (Some (delay, n + 1))) ; - (cancelation () >>= fun () -> return None) ; - (LC.wait restart >>= fun () -> return (Some (0.1, 0))) ] >>= function + Lwt.return_unit) >>= fun () -> + Lwt.pick + [ (LU.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ; + (cancelation () >>= fun () -> Lwt.return_none) ; + (LC.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ] + >>= function | Some (delay, n) when n = 10 -> loop delay 9 | Some (delay, n) -> loop (delay *. 2.) n - | None -> return () + | None -> Lwt.return_unit in loop 0.2 1 (* Main network creation and initialisation function *) @@ -695,7 +715,7 @@ module Make (P: PARAMS) = struct (* we need to ignore SIGPIPEs *) Sys.(set_signal sigpipe Signal_ignore) ; (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = canceler () in + let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* create the internal event queue *) let enqueue_event, dequeue_event = let queue, enqueue = Lwt_stream.create () in @@ -709,17 +729,19 @@ module Make (P: PARAMS) = struct (fun () -> Lwt_stream.next queue), (fun () -> enqueue None) in - on_cancel (fun () -> close_msg_queue () ; return ()) ; + on_cancel (fun () -> close_msg_queue () ; Lwt.return_unit) ; (* fill the known peers pools from last time *) Data_encoding.Json.read_file config.peers_file >>= fun res -> - let known_peers, black_list, my_gid, my_public_key, my_secret_key, my_proof_of_work = + let known_peers, black_list, my_gid, + my_public_key, my_secret_key, my_proof_of_work = let init_peers () = let my_gid = fresh_gid () in let (my_secret_key, my_public_key) = Crypto_box.random_keypair () in let my_proof_of_work = - Crypto_box.generate_proof_of_work my_public_key Crypto_box.default_target in + Crypto_box.generate_proof_of_work + my_public_key Crypto_box.default_target in let known_peers = let source = { unreachable_since = None ; connections = None ; @@ -732,7 +754,8 @@ module Make (P: PARAMS) = struct PeerMap.empty config.known_peers in let black_list = BlackList.empty in - known_peers, black_list, my_gid, my_public_key, my_secret_key, my_proof_of_work in + known_peers, black_list, my_gid, + my_public_key, my_secret_key, my_proof_of_work in match res with | None -> let known_peers, black_list, my_gid, @@ -809,36 +832,41 @@ module Make (P: PARAMS) = struct in Data_encoding.Json.write_file config.peers_file json >>= fun _ -> debug "(%a) peer cache saved" pp_gid my_gid ; - return ()) ; + Lwt.return_unit) ; (* storage of active and not yet active peers *) let incoming = ref PointMap.empty in let connected = ref PeerMap.empty in (* peer welcoming (accept) loop *) let welcome () = match config.incoming_port with - | None -> (* no input port => no welcome worker *) return () + | None -> (* no input port => no welcome worker *) Lwt.return_unit | Some port -> (* open port for incoming connexions *) let addr = Unix.inet6_addr_any in - catch - (fun () -> - let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (addr, port))) ; - LU.listen main_socket limits.max_connections ; - return (Some main_socket)) + Lwt.catch begin fun () -> + let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in + LU.(setsockopt main_socket SO_REUSEADDR true) ; + LU.(bind main_socket (ADDR_INET (addr, port))) ; + LU.listen main_socket limits.max_connections ; + Lwt.return (Some main_socket) + end (fun exn -> debug "(%a) cannot accept incoming peers (%s)" pp_gid my_gid (string_of_unix_exn exn) ; - return None)>>= function + Lwt.return_none) + >>= function | None -> (* FIXME: run in degraded mode, better exit ? *) - return () + Lwt.return_unit | Some main_socket -> (* then loop *) let rec step () = - pick [ (LU.accept main_socket >>= fun (s, a) -> return (Some (s, a))) ; - (cancelation () >>= fun _ -> return None) ] >>= function + Lwt.pick + [ ( LU.accept main_socket >>= fun (s, a) -> + Lwt.return (Some (s, a)) ) ; + ( cancelation () >>= fun _ -> + Lwt.return_none ) ] + >>= function | None -> LU.close main_socket | Some (socket, addr) -> @@ -863,11 +891,17 @@ module Make (P: PARAMS) = struct let just_maintained = LC.create () in (* maintenance worker, returns when [connections] peers are connected *) let rec maintenance () = - pick [ (LU.sleep 120. >>= fun () -> return true) ; (* every two minutes *) - (LC.wait please_maintain >>= fun () -> return true) ; (* when asked *) - (LC.wait too_few_peers >>= fun () -> return true) ; (* limits *) - (LC.wait too_many_peers >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> + Lwt.pick + [ ( LU.sleep 120. >>= fun () -> + Lwt.return_true) ; (* every two minutes *) + ( LC.wait please_maintain >>= fun () -> + Lwt.return_true) ; (* when asked *) + ( LC.wait too_few_peers >>= fun () -> + Lwt.return_true) ; (* limits *) + ( LC.wait too_many_peers >>= fun () -> + Lwt.return_true) ; + ( cancelation () >>= fun () -> + Lwt.return_false) ] >>= fun continue -> let rec maintain () = let n_connected = PeerMap.cardinal !connected in if n_connected >= limits.expected_connections @@ -895,24 +929,30 @@ module Make (P: PARAMS) = struct not (PeerMap.mem_by_gid gid !connected)) in let rec do_contact_loop strec = match strec with - | 0, _ -> return true - | _, [] -> return false (* we didn't manage to contact enough peers *) + | 0, _ -> Lwt.return_true + | _, [] -> + Lwt.return_false (* we didn't manage to contact enough peers *) | nb, ((addr, port), gid, source) :: tl -> (* we try to open a connection *) - let socket = LU.(socket (match addr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in + let socket = + let open LU in + let open Ipaddr in + let family = + match addr with V4 _ -> PF_INET | V6 _ -> PF_INET6 in + socket family SOCK_STREAM 0 in let uaddr = Ipaddr_unix.to_inet_addr addr in - catch - (fun () -> - debug "(%a) trying to connect to %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port; - Lwt.pick - [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; - LU.connect socket (LU.ADDR_INET (uaddr, port)) - ] >>= fun () -> - debug "(%a) connected to %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port; - enqueue_event (Contact ((addr, port), socket)) ; - return (nb - 1)) + Lwt.catch begin fun () -> + debug "(%a) trying to connect to %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + Lwt.pick + [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; + LU.connect socket (LU.ADDR_INET (uaddr, port)) + ] >>= fun () -> + debug "(%a) connected to %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port; + enqueue_event (Contact ((addr, port), socket)) ; + Lwt.return (nb - 1) + end (fun exn -> debug "(%a) connection failed to %a:%d (%s)" pp_gid my_gid Ipaddr.pp_hum addr port @@ -924,7 +964,7 @@ module Make (P: PARAMS) = struct { source with unreachable_since = Some now } !known_peers ; LU.close socket >>= fun () -> - return nb) >>= fun nrec -> + Lwt.return nb) >>= fun nrec -> do_contact_loop (nrec, tl) in do_contact_loop (nb, contactable) in @@ -932,21 +972,25 @@ module Make (P: PARAMS) = struct debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ; contact to_contact >>= function | true -> (* enough contacts, now wait for connections *) - pick [ (LC.wait new_peer >>= fun _ -> return true) ; - (LU.sleep 1.0 >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then maintain () else return () + Lwt.pick + [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ; + (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ; + (cancelation () >>= fun () -> Lwt.return_false) ] + >>= fun continue -> + if continue then maintain () else Lwt.return_unit | false -> (* not enough contacts, ask the pals of our pals, discover the local network and then wait *) LC.broadcast restart_discovery () ; (PeerMap.iter (fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap)) !connected ; - pick [ (LC.wait new_peer >>= fun _ -> return true) ; - (LC.wait new_contact >>= fun _ -> return true) ; - (LU.sleep 1.0 >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then maintain () else return ()) + Lwt.pick + [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ; + (LC.wait new_contact >>= fun _ -> Lwt.return_true) ; + (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ; + (cancelation () >>= fun () -> Lwt.return_false) ] + >>= fun continue -> + if continue then maintain () else Lwt.return_unit) else (* too many peers, start the russian roulette *) let to_kill = n_connected - limits.max_connections in @@ -955,13 +999,13 @@ module Make (P: PARAMS) = struct (fun _ _ peer (i, t) -> if i = 0 then (0, t) else (i - 1, t >>= fun () -> peer.disconnect ())) - !connected (to_kill, return ())) >>= fun () -> + !connected (to_kill, Lwt.return_unit)) >>= fun () -> (* and directly skip to the next maintenance request *) LC.broadcast just_maintained () ; debug "(%a) maintenance step ended" pp_gid my_gid ; maintenance () in - if continue then maintain () else return () + if continue then maintain () else Lwt.return_unit in (* select the peers to send on a bootstrap request *) let bootstrap_peers () = @@ -969,7 +1013,6 @@ module Make (P: PARAMS) = struct PeerMap.bindings !known_peers |> List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |> List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> - (* HERE *) (* we simply send the first 50 (or less) known peers *) List.fold_left (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) @@ -977,8 +1020,9 @@ module Make (P: PARAMS) = struct in (* main internal event handling worker *) let rec main () = - pick [ dequeue_event () ; - cancelation () >>= fun () -> return Shutdown ] >>= fun event -> + Lwt.pick + [ dequeue_event () ; + cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> match event with | Disconnected peer -> debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; @@ -1002,7 +1046,8 @@ module Make (P: PARAMS) = struct known_peers := PeerMap.remove_by_point point !known_peers ; known_peers := PeerMap.remove_by_gid peer.gid !known_peers ; (* then assign *) - known_peers := PeerMap.update point ~gid:peer.gid source !known_peers + known_peers := + PeerMap.update point ~gid:peer.gid source !known_peers in update @@ try match PeerMap.by_gid peer.gid !known_peers with | { connections = None ; white_listed } -> @@ -1090,12 +1135,13 @@ module Make (P: PARAMS) = struct peers ; main () | Shutdown -> - return () + Lwt.return_unit in (* blacklist filter *) let rec unblock () = - pick [ (Lwt_unix.sleep 20. >>= fun _ -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> + Lwt.pick + [ (Lwt_unix.sleep 20. >>= fun _ -> Lwt.return_true) ; + (cancelation () >>= fun () -> Lwt.return_false) ] >>= fun continue -> if continue then let now = Unix.gettimeofday () in black_list := BlackList.fold @@ -1110,20 +1156,33 @@ module Make (P: PARAMS) = struct PeerMap.update point ?gid source map) !known_peers PeerMap.empty ; unblock () - else return () + else Lwt.return_unit in (* launch all workers *) - let welcome = worker (Format.asprintf "(%a) welcome" pp_gid my_gid) welcome cancel in - let maintenance = worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in - let main = worker (Format.asprintf "(%a) reception" pp_gid my_gid) main cancel in - let unblock = worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) unblock cancel in + let welcome = + Lwt_utils.worker + (Format.asprintf "(%a) welcome" pp_gid my_gid) + welcome cancel in + let maintenance = + Lwt_utils.worker + (Format.asprintf "(%a) maintenance" pp_gid my_gid) + maintenance cancel in + let main = + Lwt_utils.worker + (Format.asprintf "(%a) reception" pp_gid my_gid) + main cancel in + let unblock = + Lwt_utils.worker + (Format.asprintf "(%a) unblacklister" pp_gid my_gid) + unblock cancel in let discovery_answerer = let buf = MBytes.create 0x100_000 in match config.discovery_port with | Some disco_port -> let answerer () = - discovery_answerer my_gid disco_port cancelation @@ fun addr port socket -> - (* do not reply to ourselves or conncted peers *) + discovery_answerer + my_gid disco_port cancelation @@ fun addr port socket -> + (* do not reply to ourselves or connected peers *) if not (PeerMap.mem_by_point (addr, port) !connected) && (try match PeerMap.gid_by_point (addr, port) !known_peers with | Some gid -> not (PeerMap.mem_by_gid gid !connected) @@ -1136,47 +1195,53 @@ module Make (P: PARAMS) = struct LU.close socket end else begin enqueue_event (Contact ((addr, port), socket)) ; - return () + Lwt.return_unit end else LU.close socket in - worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) answerer cancel - | _ -> return () in + Lwt_utils.worker + (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) + answerer cancel + | _ -> Lwt.return_unit in let discovery_sender = match config.incoming_port, config.discovery_port with | Some inco_port, Some disco_port -> let sender () = - discovery_sender my_gid disco_port inco_port cancelation restart_discovery in - worker (Format.asprintf "(%a) discovery sender" pp_gid my_gid) sender cancel - | _ -> return () in + discovery_sender + my_gid disco_port inco_port cancelation restart_discovery in + Lwt_utils.worker + (Format.asprintf "(%a) discovery sender" pp_gid my_gid) + sender cancel + | _ -> Lwt.return_unit in (* net manipulation callbacks *) let rec shutdown () = debug "(%a) starting network shutdown" pp_gid my_gid ; (* stop accepting clients *) cancel () >>= fun () -> (* wait for both workers to end *) - join [ welcome ; main ; maintenance ; unblock ; - discovery_answerer ; discovery_sender ] >>= fun () -> + Lwt.join [ welcome ; main ; maintenance ; unblock ; + discovery_answerer ; discovery_sender ] >>= fun () -> (* properly shutdown all peers *) let cancelers = PeerMap.fold (fun point _ peer res -> (peer.disconnect () >>= fun () -> connected := PeerMap.remove_by_point point !connected ; - return ()) :: res) + Lwt.return_unit) :: res) !connected @@ PointMap.fold (fun point canceler res -> (canceler () >>= fun () -> incoming := PointMap.remove point !incoming ; - return ()) :: res) + Lwt.return_unit) :: res) !incoming @@ [] in - join cancelers >>= fun () -> + Lwt.join cancelers >>= fun () -> debug "(%a) network shutdown complete" pp_gid my_gid ; - return () + Lwt.return_unit and peers () = PeerMap.fold (fun _ _ peer r -> peer :: r) !connected [] - and find_peer gid = try Some (PeerMap.by_gid gid !connected) with Not_found -> None + and find_peer gid = + try Some (PeerMap.by_gid gid !connected) with Not_found -> None and peer_info (peer : peer) = { gid = peer.gid ; addr = fst peer.point ; @@ -1186,7 +1251,7 @@ module Make (P: PARAMS) = struct and recv_from () = dequeue_msg () and send_to peer msg = - peer.send (Message msg) >>= fun _ -> return () + peer.send (Message msg) >>= fun _ -> Lwt.return_unit and try_send peer msg = Lwt.async (fun () -> peer.send (Message msg)); true and broadcast msg = @@ -1243,12 +1308,15 @@ module Make (P: PARAMS) = struct and get_metadata _gid = None (* TODO: implement *) and set_metadata _gid _meta = () (* TODO: implement *) in - let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in + let net = + { shutdown ; peers ; find_peer ; + recv_from ; send_to ; try_send ; broadcast ; + blacklist ; whitelist ; maintain ; roll ; + peer_info ; get_metadata ; set_metadata } in (* main thread, returns after first successful maintenance *) maintain () >>= fun () -> debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; - return net + Lwt.return net let faked_network = let infinity, wakeup = Lwt.wait () in @@ -1268,8 +1336,10 @@ module Make (P: PARAMS) = struct let peer_info _ = assert false in let get_metadata _ = None in let set_metadata _ _ = () in - { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } + { shutdown ; peers ; find_peer ; + recv_from ; send_to ; try_send ; broadcast ; + blacklist ; whitelist ; maintain ; roll ; + peer_info ; get_metadata ; set_metadata } (* Plug toplevel functions to callback calls. *)