From fdff344989c0278aca3ebef138372aa874d931ae Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 21:35:14 +0100 Subject: [PATCH 01/14] Shell: minor cosmetics in `p2p.ml` --- src/node/net/p2p.ml | 518 +++++++++++++++++++++++++------------------- 1 file changed, 294 insertions(+), 224 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 71480b985..777e769af 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -9,8 +9,8 @@ module LU = Lwt_unix module LC = Lwt_condition -open Lwt -open Lwt_utils + +open Lwt.Infix open Logging.Net (* public types *) @@ -150,18 +150,22 @@ module Make (P: PARAMS) = struct (obj6 (req "gid" (Fixed.string gid_length)) (req "port" uint16) - (req "pubKey" Crypto_box.public_key_encoding) + (req "pubkey" Crypto_box.public_key_encoding) (req "proof_of_work" Crypto_box.nonce_encoding) (req "message_nonce" Crypto_box.nonce_encoding) (req "versions" (Variable.list version_encoding))) (function - | Connect { gid ; port ; versions ; public_key ; proof_of_work; message_nonce } -> + | Connect { gid ; port ; public_key ; + proof_of_work ; message_nonce ; versions } -> let port = match port with None -> 0 | Some port -> port in - Some (gid, port, public_key, proof_of_work, message_nonce, versions) + Some (gid, port, public_key, + proof_of_work, message_nonce, versions) | _ -> None) - (fun (gid, port, public_key, proof_of_work, message_nonce, versions) -> - let port = if port = 0 then None else Some port in - Connect { gid ; port ; versions ; public_key ; proof_of_work; message_nonce }); + (fun (gid, port, public_key, + proof_of_work, message_nonce, versions) -> + let port = if port = 0 then None else Some port in + Connect { gid ; port ; versions ; + public_key ; proof_of_work ; message_nonce }); case ~tag:0x01 null (function Disconnect -> Some () | _ -> None) (fun () -> Disconnect); @@ -183,67 +187,78 @@ module Make (P: PARAMS) = struct (* read a message from a TCP socket *) let recv_msg ?(uncrypt = (fun buf -> Some buf)) fd buf = - catch - (fun () -> - assert (MBytes.length buf >= 2 lsl 16) ; - Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () -> - let len = EndianBigstring.BigEndian.get_uint16 buf 0 in - (* TODO timeout read ??? *) - Lwt_utils.read_mbytes ~len fd buf >>= fun () -> - let buf = MBytes.sub buf 0 len in - match uncrypt buf with - | None -> - (* TODO track invalid message *) - return Disconnect - | Some buf -> - match Data_encoding.Binary.of_bytes msg_encoding buf with - | None -> - (* TODO track invalid message *) - return Disconnect - | Some msg -> - Lwt.return msg) + Lwt.catch begin fun () -> + assert (MBytes.length buf >= 2 lsl 16) ; + Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () -> + let len = EndianBigstring.BigEndian.get_uint16 buf 0 in + (* TODO timeout read ??? *) + Lwt_utils.read_mbytes ~len fd buf >>= fun () -> + let buf = MBytes.sub buf 0 len in + match uncrypt buf with + | None -> + (* TODO track invalid message *) + Lwt.return Disconnect + | Some buf -> + match Data_encoding.Binary.of_bytes msg_encoding buf with + | None -> + (* TODO track invalid message *) + Lwt.return Disconnect + | Some msg -> + Lwt.return msg + end (function - | Unix.Unix_error _ | End_of_file -> return Disconnect - | e -> fail e) + | Unix.Unix_error _ | End_of_file -> Lwt.return Disconnect + | e -> Lwt.fail e) (* send a message over a TCP socket *) let send_msg ?crypt fd buf msg = - catch - (fun () -> - match Data_encoding.Binary.write msg_encoding msg buf hdrlen with - | None -> return_false - | Some len -> - match crypt with - | None -> - if len > maxlen then - return_false - else begin - EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; - (* TODO timeout write ??? *) - Lwt_utils.write_mbytes ~len fd buf >>= fun () -> - return true - end - | Some crypt -> - let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in - let len = MBytes.length encbuf in - if len > maxlen then - return_false - else begin - let lenbuf = MBytes.create 2 in - EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; - Lwt_utils.write_mbytes fd lenbuf >>= fun () -> - Lwt_utils.write_mbytes fd encbuf >>= fun () -> - return true - end) + Lwt.catch begin fun () -> + match Data_encoding.Binary.write msg_encoding msg buf hdrlen with + | None -> Lwt.return_false + | Some len -> + match crypt with + | None -> + if len > maxlen then + Lwt.return_false + else begin + EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; + (* TODO timeout write ??? *) + Lwt_utils.write_mbytes ~len fd buf >>= fun () -> + Lwt.return_true + end + | Some crypt -> + let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in + let len = MBytes.length encbuf in + if len > maxlen then + Lwt.return_false + else begin + let lenbuf = MBytes.create 2 in + EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; + Lwt_utils.write_mbytes fd lenbuf >>= fun () -> + Lwt_utils.write_mbytes fd encbuf >>= fun () -> + Lwt.return_true + end + end (function - | Unix.Unix_error _ | End_of_file -> return_false - | e -> fail e) + | Unix.Unix_error _ | End_of_file -> Lwt.return_false + | e -> Lwt.fail e) + + (* The (internal) type of network events, those dispatched from peer + workers to the net and others internal to net workers. *) + type event = + | Disconnected of peer + | Bootstrap of peer + | Recv of peer * P.msg + | Peers of point list + | Contact of point * LU.file_descr + | Connected of peer + | Shutdown (* A peer handle, as a record-encoded object, abstract from the outside world. A hidden Lwt worker is associated to a peer at its creation and is killed using the disconnect callback by net workers (on shutdown of during maintenance). *) - type peer = { + and peer = { gid : gid ; public_key : Crypto_box.public_key ; point : point ; @@ -281,17 +296,6 @@ module Make (P: PARAMS) = struct get_metadata : gid -> P.metadata option ; } - (* The (internal) type of network events, those dispatched from peer - workers to the net and others internal to net workers. *) - type event = - | Disconnected of peer - | Bootstrap of peer - | Recv of peer * P.msg - | Peers of point list - | Contact of point * LU.file_descr - | Connected of peer - | Shutdown - (* Run-time point-or-gid indexed storage, one point is bound to at most one gid, which is the invariant we want to keep both for the connected peers table and the known peers one *) @@ -389,11 +393,11 @@ module Make (P: PARAMS) = struct config limits my_gid my_public_key my_secret_key my_proof_of_work socket (addr, port) push white_listed = (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = canceler () in + let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) let recv ~uncrypt buf = - pick [ recv_msg ~uncrypt socket buf ; - (cancelation () >>= fun () -> return Disconnect) ] in + Lwt.pick [ recv_msg ~uncrypt socket buf ; + (cancelation () >>= fun () -> Lwt.return Disconnect) ] in (* First step: send and receive credentials, makes no difference whether we're trying to connect to a peer or checking an incoming connection, both parties must first present themselves. *) @@ -406,41 +410,49 @@ module Make (P: PARAMS) = struct message_nonce = local_nonce ; port = config.incoming_port ; versions = P.supported_versions }) >>= fun _ -> - pick [ (LU.sleep limits.peer_answer_timeout >>= fun () -> return Disconnect) ; - recv_msg socket buf ] >>= function - | Connect { gid; port = listening_port; versions ; public_key ; proof_of_work ; message_nonce } -> + Lwt.pick + [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> + Lwt.return Disconnect ) ; + recv_msg socket buf ] >>= function + | Connect { gid; port = listening_port; versions ; public_key ; + proof_of_work ; message_nonce } -> debug "(%a) connection requested from %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; let work_proved = Crypto_box.check_proof_of_work public_key proof_of_work Crypto_box.default_target in - if not work_proved then - begin - debug "connection rejected (invalid proof of work)"; + if not work_proved then begin + debug "connection rejected (invalid proof of work)" ; cancel () - end - else - begin match common_version P.supported_versions versions with + end else begin + match common_version P.supported_versions versions with | None -> - debug "(%a) connection rejected (incompatible versions) from %a:%d" + debug + "(%a) connection rejected (incompatible versions) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () | Some version -> if config.closed_network then match listening_port with | Some port when white_listed (addr, port) -> - connected buf local_nonce version gid public_key message_nonce listening_port + connected + buf local_nonce version gid + public_key message_nonce listening_port | Some port -> - debug "(%a) connection rejected (out of the closed network) from %a:%d" + debug + "(%a) connection rejected (out of the closed network) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () | None -> - debug "(%a) connection rejected (out of the closed network) from %a:unknown" + debug + "(%a) connection rejected (out of the closed network) from %a:unknown" pp_gid my_gid Ipaddr.pp_hum addr ; cancel () else - connected buf local_nonce version gid public_key message_nonce listening_port - end + connected + buf local_nonce version gid + public_key message_nonce listening_port + end | Advertise peers -> (* alternatively, one can refuse a connection but reply with some peers, so we accept this info *) @@ -472,7 +484,7 @@ module Make (P: PARAMS) = struct let crypt buf = let nonce = get_nonce remote_nonce in Crypto_box.box my_secret_key public_key buf nonce in - let send p = send_msg ~crypt socket buf p >>= fun _ -> return () in + let send p = send_msg ~crypt socket buf p >>= fun _ -> Lwt.return_unit in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; listening_port ; version ; last_seen ; disconnect ; send } in @@ -500,7 +512,7 @@ module Make (P: PARAMS) = struct in (* Events for the main worker *) push (Connected peer) ; - on_cancel (fun () -> push (Disconnected peer) ; return ()) ; + on_cancel (fun () -> push (Disconnected peer) ; Lwt.return_unit) ; (* Launch the worker *) receiver () in @@ -508,12 +520,13 @@ module Make (P: PARAMS) = struct on_cancel (fun () -> (* send_msg ~crypt socket buf Disconnect >>= fun _ -> *) LU.close socket >>= fun _ -> - return ()) ; + Lwt.return_unit) ; let worker_name = Format.asprintf "(%a) connection handler for %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port in - ignore (worker ~safe:true worker_name ~run:(fun () -> connect buf) ~cancel) ; + ignore (Lwt_utils.worker worker_name + ~safe:true ~run:(fun () -> connect buf) ~cancel) ; (* return the canceler *) cancel @@ -523,7 +536,10 @@ module Make (P: PARAMS) = struct let open Data_encoding in splitted ~json: - (conv Ipaddr.to_string (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) string) + (conv + Ipaddr.to_string + (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) + string) ~binary: (union ~tag_size:`Uint8 [ case ~tag:4 @@ -619,50 +635,50 @@ module Make (P: PARAMS) = struct callback to fill the answers and returns a canceler function *) let discovery_answerer my_gid disco_port cancelation callback = (* init a UDP listening socket on the broadcast canal *) - catch - (fun () -> - let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.(setsockopt main_socket SO_BROADCAST true) ; - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; - return (Some main_socket)) + Lwt.catch begin fun () -> + let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in + LU.(setsockopt main_socket SO_BROADCAST true) ; + LU.(setsockopt main_socket SO_REUSEADDR true) ; + LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; + Lwt.return (Some main_socket) + end (fun exn -> debug "(%a) will not listen to discovery requests (%s)" pp_gid my_gid (string_of_unix_exn exn) ; - return None) >>= function - | None -> return () + Lwt.return_none) >>= function + | None -> Lwt.return_unit | Some main_socket -> (* the answering function *) let rec step () = let buffer = discovery_message my_gid 0 in let len = MBytes.length buffer in - pick [ (cancelation () >>= fun () -> return None) ; - (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> - return (Some r)) ] >>= function - | Some (len', LU.ADDR_INET (addr, _)) -> - if len' <> len then - step () (* drop bytes, better luck next time ! *) - else - answerable_discovery_message - (Data_encoding.Binary.of_bytes - discovery_message_encoding buffer) - my_gid - (fun _ port -> - catch - (fun () -> - let ipaddr = Ipaddr_unix.of_inet_addr addr in - let ipaddr = Ipaddr.(match ipaddr with V4 addr -> V6 (v6_of_v4 addr) | _ -> ipaddr) in - let addr = Ipaddr_unix.to_inet_addr ipaddr in - let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> - callback ipaddr port socket >>= fun () -> - return ()) - (fun _ -> (* ignore errors *) return ()) >>= fun () -> - step ()) - step - | Some (_, _) -> - step () - | None -> return () + Lwt.pick + [ (cancelation () >>= fun () -> Lwt.return_none) ; + (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> + Lwt.return (Some r)) ] >>= function + | None -> Lwt.return_unit + | Some (len', LU.ADDR_INET (addr, _)) when len' = len -> + answerable_discovery_message + (Data_encoding.Binary.of_bytes + discovery_message_encoding buffer) + my_gid + (fun _ port -> + Lwt.catch begin fun () -> + let ipaddr = + let open Ipaddr in + match Ipaddr_unix.of_inet_addr addr with + | V4 addr -> V6 (v6_of_v4 addr) + | V6 _ as addr -> addr in + let addr = Ipaddr_unix.to_inet_addr ipaddr in + let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in + LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> + callback ipaddr port socket >>= fun () -> + Lwt.return_unit + end + (fun _ -> (* ignore errors *) Lwt.return_unit) >>= fun () -> + step ()) + step + | Some _ -> step () in step () (* Sends dicover messages into space in an exponentially delayed loop, @@ -670,24 +686,28 @@ module Make (P: PARAMS) = struct let discovery_sender my_gid disco_port inco_port cancelation restart = let msg = discovery_message my_gid inco_port in let rec loop delay n = - catch - (fun () -> - let socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.setsockopt socket LU.SO_BROADCAST true ; - LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () -> - Lwt_utils.write_mbytes socket msg >>= fun _ -> - LU.close socket) + Lwt.catch begin fun () -> + let socket = LU.(socket PF_INET SOCK_DGRAM 0) in + LU.setsockopt socket LU.SO_BROADCAST true ; + let broadcast_ipv4 = Unix.inet_addr_of_string "255.255.255.255" in + LU.connect socket + LU.(ADDR_INET (broadcast_ipv4, disco_port)) >>= fun () -> + Lwt_utils.write_mbytes socket msg >>= fun _ -> + LU.close socket + end (fun _ -> debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; - return ()) >>= fun () -> - pick [ (LU.sleep delay >>= fun () -> return (Some (delay, n + 1))) ; - (cancelation () >>= fun () -> return None) ; - (LC.wait restart >>= fun () -> return (Some (0.1, 0))) ] >>= function + Lwt.return_unit) >>= fun () -> + Lwt.pick + [ (LU.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ; + (cancelation () >>= fun () -> Lwt.return_none) ; + (LC.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ] + >>= function | Some (delay, n) when n = 10 -> loop delay 9 | Some (delay, n) -> loop (delay *. 2.) n - | None -> return () + | None -> Lwt.return_unit in loop 0.2 1 (* Main network creation and initialisation function *) @@ -695,7 +715,7 @@ module Make (P: PARAMS) = struct (* we need to ignore SIGPIPEs *) Sys.(set_signal sigpipe Signal_ignore) ; (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = canceler () in + let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* create the internal event queue *) let enqueue_event, dequeue_event = let queue, enqueue = Lwt_stream.create () in @@ -709,17 +729,19 @@ module Make (P: PARAMS) = struct (fun () -> Lwt_stream.next queue), (fun () -> enqueue None) in - on_cancel (fun () -> close_msg_queue () ; return ()) ; + on_cancel (fun () -> close_msg_queue () ; Lwt.return_unit) ; (* fill the known peers pools from last time *) Data_encoding.Json.read_file config.peers_file >>= fun res -> - let known_peers, black_list, my_gid, my_public_key, my_secret_key, my_proof_of_work = + let known_peers, black_list, my_gid, + my_public_key, my_secret_key, my_proof_of_work = let init_peers () = let my_gid = fresh_gid () in let (my_secret_key, my_public_key) = Crypto_box.random_keypair () in let my_proof_of_work = - Crypto_box.generate_proof_of_work my_public_key Crypto_box.default_target in + Crypto_box.generate_proof_of_work + my_public_key Crypto_box.default_target in let known_peers = let source = { unreachable_since = None ; connections = None ; @@ -732,7 +754,8 @@ module Make (P: PARAMS) = struct PeerMap.empty config.known_peers in let black_list = BlackList.empty in - known_peers, black_list, my_gid, my_public_key, my_secret_key, my_proof_of_work in + known_peers, black_list, my_gid, + my_public_key, my_secret_key, my_proof_of_work in match res with | None -> let known_peers, black_list, my_gid, @@ -809,36 +832,41 @@ module Make (P: PARAMS) = struct in Data_encoding.Json.write_file config.peers_file json >>= fun _ -> debug "(%a) peer cache saved" pp_gid my_gid ; - return ()) ; + Lwt.return_unit) ; (* storage of active and not yet active peers *) let incoming = ref PointMap.empty in let connected = ref PeerMap.empty in (* peer welcoming (accept) loop *) let welcome () = match config.incoming_port with - | None -> (* no input port => no welcome worker *) return () + | None -> (* no input port => no welcome worker *) Lwt.return_unit | Some port -> (* open port for incoming connexions *) let addr = Unix.inet6_addr_any in - catch - (fun () -> - let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (addr, port))) ; - LU.listen main_socket limits.max_connections ; - return (Some main_socket)) + Lwt.catch begin fun () -> + let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in + LU.(setsockopt main_socket SO_REUSEADDR true) ; + LU.(bind main_socket (ADDR_INET (addr, port))) ; + LU.listen main_socket limits.max_connections ; + Lwt.return (Some main_socket) + end (fun exn -> debug "(%a) cannot accept incoming peers (%s)" pp_gid my_gid (string_of_unix_exn exn) ; - return None)>>= function + Lwt.return_none) + >>= function | None -> (* FIXME: run in degraded mode, better exit ? *) - return () + Lwt.return_unit | Some main_socket -> (* then loop *) let rec step () = - pick [ (LU.accept main_socket >>= fun (s, a) -> return (Some (s, a))) ; - (cancelation () >>= fun _ -> return None) ] >>= function + Lwt.pick + [ ( LU.accept main_socket >>= fun (s, a) -> + Lwt.return (Some (s, a)) ) ; + ( cancelation () >>= fun _ -> + Lwt.return_none ) ] + >>= function | None -> LU.close main_socket | Some (socket, addr) -> @@ -863,11 +891,17 @@ module Make (P: PARAMS) = struct let just_maintained = LC.create () in (* maintenance worker, returns when [connections] peers are connected *) let rec maintenance () = - pick [ (LU.sleep 120. >>= fun () -> return true) ; (* every two minutes *) - (LC.wait please_maintain >>= fun () -> return true) ; (* when asked *) - (LC.wait too_few_peers >>= fun () -> return true) ; (* limits *) - (LC.wait too_many_peers >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> + Lwt.pick + [ ( LU.sleep 120. >>= fun () -> + Lwt.return_true) ; (* every two minutes *) + ( LC.wait please_maintain >>= fun () -> + Lwt.return_true) ; (* when asked *) + ( LC.wait too_few_peers >>= fun () -> + Lwt.return_true) ; (* limits *) + ( LC.wait too_many_peers >>= fun () -> + Lwt.return_true) ; + ( cancelation () >>= fun () -> + Lwt.return_false) ] >>= fun continue -> let rec maintain () = let n_connected = PeerMap.cardinal !connected in if n_connected >= limits.expected_connections @@ -895,24 +929,30 @@ module Make (P: PARAMS) = struct not (PeerMap.mem_by_gid gid !connected)) in let rec do_contact_loop strec = match strec with - | 0, _ -> return true - | _, [] -> return false (* we didn't manage to contact enough peers *) + | 0, _ -> Lwt.return_true + | _, [] -> + Lwt.return_false (* we didn't manage to contact enough peers *) | nb, ((addr, port), gid, source) :: tl -> (* we try to open a connection *) - let socket = LU.(socket (match addr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in + let socket = + let open LU in + let open Ipaddr in + let family = + match addr with V4 _ -> PF_INET | V6 _ -> PF_INET6 in + socket family SOCK_STREAM 0 in let uaddr = Ipaddr_unix.to_inet_addr addr in - catch - (fun () -> - debug "(%a) trying to connect to %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port; - Lwt.pick - [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; - LU.connect socket (LU.ADDR_INET (uaddr, port)) - ] >>= fun () -> - debug "(%a) connected to %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port; - enqueue_event (Contact ((addr, port), socket)) ; - return (nb - 1)) + Lwt.catch begin fun () -> + debug "(%a) trying to connect to %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + Lwt.pick + [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; + LU.connect socket (LU.ADDR_INET (uaddr, port)) + ] >>= fun () -> + debug "(%a) connected to %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port; + enqueue_event (Contact ((addr, port), socket)) ; + Lwt.return (nb - 1) + end (fun exn -> debug "(%a) connection failed to %a:%d (%s)" pp_gid my_gid Ipaddr.pp_hum addr port @@ -924,7 +964,7 @@ module Make (P: PARAMS) = struct { source with unreachable_since = Some now } !known_peers ; LU.close socket >>= fun () -> - return nb) >>= fun nrec -> + Lwt.return nb) >>= fun nrec -> do_contact_loop (nrec, tl) in do_contact_loop (nb, contactable) in @@ -932,21 +972,25 @@ module Make (P: PARAMS) = struct debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ; contact to_contact >>= function | true -> (* enough contacts, now wait for connections *) - pick [ (LC.wait new_peer >>= fun _ -> return true) ; - (LU.sleep 1.0 >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then maintain () else return () + Lwt.pick + [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ; + (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ; + (cancelation () >>= fun () -> Lwt.return_false) ] + >>= fun continue -> + if continue then maintain () else Lwt.return_unit | false -> (* not enough contacts, ask the pals of our pals, discover the local network and then wait *) LC.broadcast restart_discovery () ; (PeerMap.iter (fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap)) !connected ; - pick [ (LC.wait new_peer >>= fun _ -> return true) ; - (LC.wait new_contact >>= fun _ -> return true) ; - (LU.sleep 1.0 >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then maintain () else return ()) + Lwt.pick + [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ; + (LC.wait new_contact >>= fun _ -> Lwt.return_true) ; + (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ; + (cancelation () >>= fun () -> Lwt.return_false) ] + >>= fun continue -> + if continue then maintain () else Lwt.return_unit) else (* too many peers, start the russian roulette *) let to_kill = n_connected - limits.max_connections in @@ -955,13 +999,13 @@ module Make (P: PARAMS) = struct (fun _ _ peer (i, t) -> if i = 0 then (0, t) else (i - 1, t >>= fun () -> peer.disconnect ())) - !connected (to_kill, return ())) >>= fun () -> + !connected (to_kill, Lwt.return_unit)) >>= fun () -> (* and directly skip to the next maintenance request *) LC.broadcast just_maintained () ; debug "(%a) maintenance step ended" pp_gid my_gid ; maintenance () in - if continue then maintain () else return () + if continue then maintain () else Lwt.return_unit in (* select the peers to send on a bootstrap request *) let bootstrap_peers () = @@ -969,7 +1013,6 @@ module Make (P: PARAMS) = struct PeerMap.bindings !known_peers |> List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |> List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> - (* HERE *) (* we simply send the first 50 (or less) known peers *) List.fold_left (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) @@ -977,8 +1020,9 @@ module Make (P: PARAMS) = struct in (* main internal event handling worker *) let rec main () = - pick [ dequeue_event () ; - cancelation () >>= fun () -> return Shutdown ] >>= fun event -> + Lwt.pick + [ dequeue_event () ; + cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> match event with | Disconnected peer -> debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; @@ -1002,7 +1046,8 @@ module Make (P: PARAMS) = struct known_peers := PeerMap.remove_by_point point !known_peers ; known_peers := PeerMap.remove_by_gid peer.gid !known_peers ; (* then assign *) - known_peers := PeerMap.update point ~gid:peer.gid source !known_peers + known_peers := + PeerMap.update point ~gid:peer.gid source !known_peers in update @@ try match PeerMap.by_gid peer.gid !known_peers with | { connections = None ; white_listed } -> @@ -1090,12 +1135,13 @@ module Make (P: PARAMS) = struct peers ; main () | Shutdown -> - return () + Lwt.return_unit in (* blacklist filter *) let rec unblock () = - pick [ (Lwt_unix.sleep 20. >>= fun _ -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> + Lwt.pick + [ (Lwt_unix.sleep 20. >>= fun _ -> Lwt.return_true) ; + (cancelation () >>= fun () -> Lwt.return_false) ] >>= fun continue -> if continue then let now = Unix.gettimeofday () in black_list := BlackList.fold @@ -1110,20 +1156,33 @@ module Make (P: PARAMS) = struct PeerMap.update point ?gid source map) !known_peers PeerMap.empty ; unblock () - else return () + else Lwt.return_unit in (* launch all workers *) - let welcome = worker (Format.asprintf "(%a) welcome" pp_gid my_gid) welcome cancel in - let maintenance = worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in - let main = worker (Format.asprintf "(%a) reception" pp_gid my_gid) main cancel in - let unblock = worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) unblock cancel in + let welcome = + Lwt_utils.worker + (Format.asprintf "(%a) welcome" pp_gid my_gid) + welcome cancel in + let maintenance = + Lwt_utils.worker + (Format.asprintf "(%a) maintenance" pp_gid my_gid) + maintenance cancel in + let main = + Lwt_utils.worker + (Format.asprintf "(%a) reception" pp_gid my_gid) + main cancel in + let unblock = + Lwt_utils.worker + (Format.asprintf "(%a) unblacklister" pp_gid my_gid) + unblock cancel in let discovery_answerer = let buf = MBytes.create 0x100_000 in match config.discovery_port with | Some disco_port -> let answerer () = - discovery_answerer my_gid disco_port cancelation @@ fun addr port socket -> - (* do not reply to ourselves or conncted peers *) + discovery_answerer + my_gid disco_port cancelation @@ fun addr port socket -> + (* do not reply to ourselves or connected peers *) if not (PeerMap.mem_by_point (addr, port) !connected) && (try match PeerMap.gid_by_point (addr, port) !known_peers with | Some gid -> not (PeerMap.mem_by_gid gid !connected) @@ -1136,47 +1195,53 @@ module Make (P: PARAMS) = struct LU.close socket end else begin enqueue_event (Contact ((addr, port), socket)) ; - return () + Lwt.return_unit end else LU.close socket in - worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) answerer cancel - | _ -> return () in + Lwt_utils.worker + (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) + answerer cancel + | _ -> Lwt.return_unit in let discovery_sender = match config.incoming_port, config.discovery_port with | Some inco_port, Some disco_port -> let sender () = - discovery_sender my_gid disco_port inco_port cancelation restart_discovery in - worker (Format.asprintf "(%a) discovery sender" pp_gid my_gid) sender cancel - | _ -> return () in + discovery_sender + my_gid disco_port inco_port cancelation restart_discovery in + Lwt_utils.worker + (Format.asprintf "(%a) discovery sender" pp_gid my_gid) + sender cancel + | _ -> Lwt.return_unit in (* net manipulation callbacks *) let rec shutdown () = debug "(%a) starting network shutdown" pp_gid my_gid ; (* stop accepting clients *) cancel () >>= fun () -> (* wait for both workers to end *) - join [ welcome ; main ; maintenance ; unblock ; - discovery_answerer ; discovery_sender ] >>= fun () -> + Lwt.join [ welcome ; main ; maintenance ; unblock ; + discovery_answerer ; discovery_sender ] >>= fun () -> (* properly shutdown all peers *) let cancelers = PeerMap.fold (fun point _ peer res -> (peer.disconnect () >>= fun () -> connected := PeerMap.remove_by_point point !connected ; - return ()) :: res) + Lwt.return_unit) :: res) !connected @@ PointMap.fold (fun point canceler res -> (canceler () >>= fun () -> incoming := PointMap.remove point !incoming ; - return ()) :: res) + Lwt.return_unit) :: res) !incoming @@ [] in - join cancelers >>= fun () -> + Lwt.join cancelers >>= fun () -> debug "(%a) network shutdown complete" pp_gid my_gid ; - return () + Lwt.return_unit and peers () = PeerMap.fold (fun _ _ peer r -> peer :: r) !connected [] - and find_peer gid = try Some (PeerMap.by_gid gid !connected) with Not_found -> None + and find_peer gid = + try Some (PeerMap.by_gid gid !connected) with Not_found -> None and peer_info (peer : peer) = { gid = peer.gid ; addr = fst peer.point ; @@ -1186,7 +1251,7 @@ module Make (P: PARAMS) = struct and recv_from () = dequeue_msg () and send_to peer msg = - peer.send (Message msg) >>= fun _ -> return () + peer.send (Message msg) >>= fun _ -> Lwt.return_unit and try_send peer msg = Lwt.async (fun () -> peer.send (Message msg)); true and broadcast msg = @@ -1243,12 +1308,15 @@ module Make (P: PARAMS) = struct and get_metadata _gid = None (* TODO: implement *) and set_metadata _gid _meta = () (* TODO: implement *) in - let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in + let net = + { shutdown ; peers ; find_peer ; + recv_from ; send_to ; try_send ; broadcast ; + blacklist ; whitelist ; maintain ; roll ; + peer_info ; get_metadata ; set_metadata } in (* main thread, returns after first successful maintenance *) maintain () >>= fun () -> debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; - return net + Lwt.return net let faked_network = let infinity, wakeup = Lwt.wait () in @@ -1268,8 +1336,10 @@ module Make (P: PARAMS) = struct let peer_info _ = assert false in let get_metadata _ = None in let set_metadata _ _ = () in - { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } + { shutdown ; peers ; find_peer ; + recv_from ; send_to ; try_send ; broadcast ; + blacklist ; whitelist ; maintain ; roll ; + peer_info ; get_metadata ; set_metadata } (* Plug toplevel functions to callback calls. *) From 26c84de550a4a06c3850891f183b92abc1a376bf Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 21:46:26 +0100 Subject: [PATCH 02/14] Shell: introduce `Lwt_pipe` --- src/Makefile | 2 + src/utils/lwt_pipe.ml | 105 +++++++++++++++++++++++++++++++++++++++++ src/utils/lwt_pipe.mli | 54 +++++++++++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 src/utils/lwt_pipe.ml create mode 100644 src/utils/lwt_pipe.mli diff --git a/src/Makefile b/src/Makefile index 316ca629d..f550b9fcf 100644 --- a/src/Makefile +++ b/src/Makefile @@ -111,6 +111,7 @@ UTILS_LIB_INTFS := \ utils/error_monad.mli \ utils/logging.mli \ utils/lwt_utils.mli \ + utils/lwt_pipe.mli \ utils/IO.mli \ UTILS_LIB_IMPLS := \ @@ -128,6 +129,7 @@ UTILS_LIB_IMPLS := \ utils/error_monad.ml \ utils/logging.ml \ utils/lwt_utils.ml \ + utils/lwt_pipe.ml \ utils/IO.ml \ UTILS_PACKAGES := \ diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml new file mode 100644 index 000000000..f6348218a --- /dev/null +++ b/src/utils/lwt_pipe.ml @@ -0,0 +1,105 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Lwt.Infix + +type 'a t = + { queue : 'a Queue.t ; + size : int ; + mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ; + mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option } + +let create ~size = + { queue = Queue.create () ; + size ; + push_waiter = None ; + pop_waiter = None } + +let notify_push q = + match q.push_waiter with + | None -> () + | Some (_, w) -> + q.push_waiter <- None ; + Lwt.wakeup_later w () + +let notify_pop q = + match q.pop_waiter with + | None -> () + | Some (_, w) -> + q.pop_waiter <- None ; + Lwt.wakeup_later w () + +let wait_push q = + match q.push_waiter with + | Some (t, _) -> t + | None -> + let waiter, wakener = Lwt.wait () in + q.push_waiter <- Some (waiter, wakener) ; + waiter + +let wait_pop q = + match q.pop_waiter with + | Some (t, _) -> t + | None -> + let waiter, wakener = Lwt.wait () in + q.pop_waiter <- Some (waiter, wakener) ; + waiter + +let rec push ({ queue ; size } as q) elt = + if Queue.length queue < size then begin + Queue.push elt queue ; + notify_push q ; + Lwt.return_unit + end else + wait_pop q >>= fun () -> + push q elt + +let rec push_now ({ queue; size } as q) elt = + Queue.length queue < size && begin + Queue.push elt queue ; + notify_push q ; + true + end + +let rec pop ({ queue } as q) = + if not (Queue.is_empty queue) then + let elt = Queue.pop queue in + notify_pop q ; + Lwt.return elt + else + wait_push q >>= fun () -> + pop q + +let rec peek ({ queue } as q) = + if not (Queue.is_empty queue) then + let elt = Queue.peek queue in + Lwt.return elt + else + wait_push q >>= fun () -> + peek q + +let pop_now_exn ({ queue } as q) = + let elt = Queue.pop queue in + notify_pop q ; + elt + +let pop_now q = + match pop_now_exn q with + | exception Queue.Empty -> None + | elt -> Some elt + +let length { queue } = Queue.length queue +let is_empty { queue } = Queue.is_empty queue + +let rec values_available q = + if is_empty q then + wait_push q >>= fun () -> + values_available q + else + Lwt.return_unit diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli new file mode 100644 index 000000000..f880522d8 --- /dev/null +++ b/src/utils/lwt_pipe.mli @@ -0,0 +1,54 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Data queues similar to the [Pipe] module in Jane Street's [Async] + library. They are implemented with [Queue]s, limited in size, and + use lwt primitives for concurrent access. *) + +type 'a t +(** Type of queues holding values of type ['a]. *) + +val create : size:int -> 'a t +(** [create ~size] is an empty queue that can hold max [size] + elements. *) + +val push : 'a t -> 'a -> unit Lwt.t +(** [push q v] is a thread that blocks while [q] contains more + than [size] elements, then adds [v] at the end of [q]. *) + +val pop : 'a t -> 'a Lwt.t +(** [pop q] is a thread that blocks while [q] is empty, then + removes and returns the first element in [q]. *) + +val peek : 'a t -> 'a Lwt.t +(** [peek] is like [pop] except it does not removes the first + element. *) + +val values_available : 'a t -> unit Lwt.t +(** [values_available] is like [peek] but it ignores the value + returned. *) + +val push_now : 'a t -> 'a -> bool +(** [push_now q v] adds [v] at the ends of [q] immediately and returns + [false] if [q] is currently full, [true] otherwise. *) + +val pop_now : 'a t -> 'a option +(** [pop_now q] maybe removes and returns the first element in [q] if + [q] contains at least one element. *) + +val pop_now_exn : 'a t -> 'a +(** [pop_now_exn q] removes and returns the first element in [q] if + [q] contains at least one element, or raise [Empty] otherwise. *) + +val length : 'a t -> int +(** [length q] is the number of elements in [q]. *) + +val is_empty : 'a t -> bool +(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *) + From 16a3c88b1f513692451c9bdf3c5f8cc3deb95240 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 21:54:32 +0100 Subject: [PATCH 03/14] Shell: introduce `Moving_average` --- src/Makefile | 2 ++ src/utils/moving_average.ml | 37 ++++++++++++++++++++++++++++++++++++ src/utils/moving_average.mli | 34 +++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+) create mode 100644 src/utils/moving_average.ml create mode 100644 src/utils/moving_average.mli diff --git a/src/Makefile b/src/Makefile index f550b9fcf..003346a14 100644 --- a/src/Makefile +++ b/src/Makefile @@ -113,6 +113,7 @@ UTILS_LIB_INTFS := \ utils/lwt_utils.mli \ utils/lwt_pipe.mli \ utils/IO.mli \ + utils/moving_average.mli \ UTILS_LIB_IMPLS := \ utils/mBytes.ml \ @@ -131,6 +132,7 @@ UTILS_LIB_IMPLS := \ utils/lwt_utils.ml \ utils/lwt_pipe.ml \ utils/IO.ml \ + utils/moving_average.ml \ UTILS_PACKAGES := \ base64 \ diff --git a/src/utils/moving_average.ml b/src/utils/moving_average.ml new file mode 100644 index 000000000..00b79977c --- /dev/null +++ b/src/utils/moving_average.ml @@ -0,0 +1,37 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +class type ma = object + method add_float : float -> unit + method add_int : int -> unit + method get : float +end + +class virtual base ?(init = 0.) () = object (self) + val mutable acc : float = init + method virtual add_float : float -> unit + method add_int x = self#add_float (float_of_int x) + method get = acc +end + +class sma ?init () = object + inherit base ?init () + val mutable i = match init with None -> 0 | _ -> 1 + method add_float x = + acc <- (acc +. (x -. acc) /. (float_of_int @@ succ i)) ; + i <- succ i +end + +class ema ?init ~alpha () = object + inherit base ?init () + val alpha = alpha + method add_float x = + acc <- alpha *. x +. (1. -. alpha) *. acc +end + diff --git a/src/utils/moving_average.mli b/src/utils/moving_average.mli new file mode 100644 index 000000000..a5768ee51 --- /dev/null +++ b/src/utils/moving_average.mli @@ -0,0 +1,34 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Moving averages. The formulas are from Wikipedia + [https://en.wikipedia.org/wiki/Moving_average] *) + +class type ma = object + method add_float : float -> unit + method add_int : int -> unit + method get : float +end +(** Common class type for objects computing a cumulative moving + average of some flavor. In a cumulative moving average, the data + arrive in an ordered datum stream, and the user would like to get + the average of all of the data up until the current datum + point. The method [add_float] and [add_int] are used to add the + next datum. The method [get] and [get_exn] are used to compute the + moving average up until the current datum point. *) + +class sma : ?init:float -> unit -> ma +(** [sma ?init ()] is an object that computes the Simple Moving + Average of a datum stream. [SMA(n+1) = SMA(n) + (x_(n+1) / SMA(n)) + / (n+1)] *) + +class ema : ?init:float -> alpha:float -> unit -> ma +(** [ema ?init ~alpha ()] is an object that computes the Exponential + Moving Average of a datum stream. [EMA(n+1) = alpha * x_(n+1) + + (1 - alpha) * x_n] *) From d41c05a0660884f88a7f84f3b8dcf3e0f84a2481 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 22:18:00 +0100 Subject: [PATCH 04/14] Shell: minor rewording in `P2p` --- src/node/net/p2p.ml | 30 ++++++++++++++---------------- src/node/net/p2p.mli | 16 ++++++++-------- src/node/shell/tezos_p2p.mli | 8 ++++---- src/node_main.ml | 2 +- test/test_p2p.ml | 6 +++--- 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 777e769af..5051819b9 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -33,7 +33,7 @@ let version_encoding = (req "minor" int8)) type limits = { - max_packet_size : int ; + max_message_size : int ; peer_answer_timeout : float ; expected_connections : int ; min_connections : int ; @@ -121,14 +121,12 @@ end module Make (P: PARAMS) = struct - (* Low-level network protocol packets (internal). The protocol is + (* Low-level network protocol messages (internal). The protocol is completely symmetrical and asynchronous. First both peers must - present their credentials with a [Connect] packet, then any - combination of the other packets can be received at any time. An + present their credentials with a [Connect] message, then any + combination of the other messages can be received at any time. An exception is the [Disconnect] message, which should mark the end of - transmission (and needs not being replied). The [Unkown] packet is - not a real kind of packet, it means that something indecypherable - was transmitted. *) + transmission (and needs not being replied). *) type msg = | Connect of { gid : string ; @@ -282,7 +280,7 @@ module Make (P: PARAMS) = struct type net = { recv_from : unit -> (peer * P.msg) Lwt.t ; send_to : peer -> P.msg -> unit Lwt.t ; - try_send : peer -> P.msg -> bool ; + try_send_to : peer -> P.msg -> bool ; broadcast : P.msg -> unit ; blacklist : ?duration:float -> addr -> unit ; whitelist : peer -> unit ; @@ -496,11 +494,11 @@ module Make (P: PARAMS) = struct pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; None | Some _ as res -> res in - (* The packet reception loop. *) + (* The message reception loop. *) let rec receiver () = - recv ~uncrypt buf >>= fun packet -> + recv ~uncrypt buf >>= fun message -> last := Unix.gettimeofday () ; - match packet with + match message with | Connect _ | Disconnect -> debug "(%a) disconnected (by peer) %a @@ %a:%d" @@ -1252,7 +1250,7 @@ module Make (P: PARAMS) = struct dequeue_msg () and send_to peer msg = peer.send (Message msg) >>= fun _ -> Lwt.return_unit - and try_send peer msg = + and try_send_to peer msg = Lwt.async (fun () -> peer.send (Message msg)); true and broadcast msg = PeerMap.iter @@ -1310,7 +1308,7 @@ module Make (P: PARAMS) = struct in let net = { shutdown ; peers ; find_peer ; - recv_from ; send_to ; try_send ; broadcast ; + recv_from ; send_to ; try_send_to ; broadcast ; blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in (* main thread, returns after first successful maintenance *) @@ -1327,7 +1325,7 @@ module Make (P: PARAMS) = struct let find_peer _ = None in let recv_from () = infinity in let send_to _ _ = Lwt.return_unit in - let try_send _ _ = true in + let try_send_to _ _ = true in let broadcast _ = () in let blacklist ?duration _ = ignore duration ; () in let whitelist _ = () in @@ -1337,7 +1335,7 @@ module Make (P: PARAMS) = struct let get_metadata _ = None in let set_metadata _ _ = () in { shutdown ; peers ; find_peer ; - recv_from ; send_to ; try_send ; broadcast ; + recv_from ; send_to ; try_send_to ; broadcast ; blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } @@ -1349,7 +1347,7 @@ module Make (P: PARAMS) = struct let peer_info net peer = net.peer_info peer let recv net = net.recv_from () let send net peer msg = net.send_to peer msg - let try_send net peer = net.try_send peer + let try_send net peer msg = net.try_send_to peer msg let broadcast net msg = net.broadcast msg let maintain net = net.maintain () let roll net = net.roll () diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index c96059b8f..0239bb4d6 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -39,8 +39,8 @@ type config = { (** Network capacities *) type limits = { - (** Maximum length in bytes of network messages' payload *) - max_packet_size : int ; + (** Maximum length in bytes of network messages *) + max_message_size : int ; (** Delay after which a non responding peer is considered dead *) peer_answer_timeout : float ; (** Minimum number of connections to reach when staring / maitening *) @@ -129,18 +129,18 @@ module Make (P : PARAMS) : sig val get_metadata : net -> gid -> P.metadata option val set_metadata : net -> gid -> P.metadata -> unit - (** Wait for a payload from any peer in the network *) + (** Wait for a message from any peer in the network *) val recv : net -> (peer * P.msg) Lwt.t - (** Send a payload to a peer and wait for it to be in the tube *) + (** [send net peer msg] is a thread that returns when [msg] has been + successfully enqueued in the send queue. *) val send : net -> peer -> P.msg -> unit Lwt.t - (** Send a payload to a peer without waiting for the result. Return - [true] if the message can be enqueued in the peer's output queue - or [false] otherwise. *) + (** [try_send net peer msg] is [true] if [msg] has been added to the + send queue for [peer], [false] otherwise *) val try_send : net -> peer -> P.msg -> bool - (** Send a payload to all peers *) + (** Send a message to all peers *) val broadcast : net -> P.msg -> unit (** Shutdown the connection to all peers at this address and stop the diff --git a/src/node/shell/tezos_p2p.mli b/src/node/shell/tezos_p2p.mli index 94262be34..9d0b64e09 100644 --- a/src/node/shell/tezos_p2p.mli +++ b/src/node/shell/tezos_p2p.mli @@ -67,12 +67,12 @@ type msg = (** Wait for a payload from any peer in the network *) val recv : net -> (peer * msg) Lwt.t -(** Send a payload to a peer and wait for it to be in the tube *) +(** [send net peer msg] is a thread that returns when [msg] has been + successfully enqueued in the send queue. *) val send : net -> peer -> msg -> unit Lwt.t -(** Send a payload to a peer without waiting for the result. Return - [true] if the msg can be enqueued in the peer's output queue - or [false] otherwise. *) +(** [try_send net peer msg] is [true] if [msg] has been added to the + send queue for [peer], [false] otherwise *) val try_send : net -> peer -> msg -> bool (** Send a payload to all peers *) diff --git a/src/node_main.ml b/src/node_main.ml index e8a663dbb..2134b5c9a 100644 --- a/src/node_main.ml +++ b/src/node_main.ml @@ -396,7 +396,7 @@ let init_node { sandbox ; sandbox_param ; | Some _ -> None | None -> let limits = - { max_packet_size = 10_000 ; + { max_message_size = 10_000 ; peer_answer_timeout = 5. ; expected_connections ; min_connections ; diff --git a/test/test_p2p.ml b/test/test_p2p.ml index 20a696cb0..e4f682711 100644 --- a/test/test_p2p.ml +++ b/test/test_p2p.ml @@ -39,7 +39,7 @@ let main () = let known_peers = ref [] in let closed_network = ref false in - let max_packet_size = ref 1024 in + let max_message_size = ref 1024 in let peer_answer_timeout = ref 10. in let expected_connections = ref 1 in let min_connections = ref 0 in @@ -52,7 +52,7 @@ let main () = "-peers-file", Set_string peers_file, " Peers filepath"; "-closed", Set closed_network, " Closed network mode"; - "-max-packet-size", Set_int max_packet_size, "int Max size of packets"; + "-max-message-size", Set_int max_message_size, "int Max size of messages"; "-peer-answer-timeout", Set_float peer_answer_timeout, "float Number of seconds"; "-expected-connections", Set_int expected_connections, "conns Expected connections"; "-min-connections", Set_int min_connections, "conns Minimal number of connections"; @@ -74,7 +74,7 @@ let main () = } in let limits = { - max_packet_size = !max_packet_size; + max_message_size = !max_message_size; peer_answer_timeout = !peer_answer_timeout; expected_connections = !expected_connections; min_connections = !min_connections; From 41d5bbe989e976c3d882daf310dc1b954c1ffac0 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 22:40:49 +0100 Subject: [PATCH 05/14] Shell: Use some `Error_monad` in `P2p` --- src/node/net/p2p.ml | 82 +++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 5051819b9..9cbb372d3 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -13,6 +13,13 @@ module LC = Lwt_condition open Lwt.Infix open Logging.Net +type error += Encoding_error +type error += Message_too_big +type error += Write_would_block +type error += Decipher_error +type error += Canceled +type error += Timeout + (* public types *) type addr = Ipaddr.t type port = int @@ -195,51 +202,46 @@ module Make (P: PARAMS) = struct match uncrypt buf with | None -> (* TODO track invalid message *) - Lwt.return Disconnect + Error_monad.fail Decipher_error | Some buf -> match Data_encoding.Binary.of_bytes msg_encoding buf with | None -> (* TODO track invalid message *) - Lwt.return Disconnect + Error_monad.fail Encoding_error | Some msg -> - Lwt.return msg + Error_monad.return (len, msg) end - (function - | Unix.Unix_error _ | End_of_file -> Lwt.return Disconnect - | e -> Lwt.fail e) + (fun exn -> Lwt.return @@ Error_monad.error_exn exn) (* send a message over a TCP socket *) let send_msg ?crypt fd buf msg = Lwt.catch begin fun () -> match Data_encoding.Binary.write msg_encoding msg buf hdrlen with - | None -> Lwt.return_false + | None -> Error_monad.fail Encoding_error | Some len -> match crypt with | None -> - if len > maxlen then - Lwt.return_false + if len > maxlen then Error_monad.fail Message_too_big else begin EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; (* TODO timeout write ??? *) Lwt_utils.write_mbytes ~len fd buf >>= fun () -> - Lwt.return_true + Error_monad.return len end | Some crypt -> let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in let len = MBytes.length encbuf in - if len > maxlen then - Lwt.return_false + if len > maxlen then Error_monad.fail Message_too_big else begin let lenbuf = MBytes.create 2 in EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; Lwt_utils.write_mbytes fd lenbuf >>= fun () -> Lwt_utils.write_mbytes fd encbuf >>= fun () -> - Lwt.return_true + Error_monad.return len end end - (function - | Unix.Unix_error _ | End_of_file -> Lwt.return_false - | e -> Lwt.fail e) + (fun exn -> Lwt.return @@ Error_monad.error_exn exn) + (* The (internal) type of network events, those dispatched from peer workers to the net and others internal to net workers. *) @@ -393,9 +395,11 @@ module Make (P: PARAMS) = struct (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) - let recv ~uncrypt buf = - Lwt.pick [ recv_msg ~uncrypt socket buf ; - (cancelation () >>= fun () -> Lwt.return Disconnect) ] in + let recv ?uncrypt buf = + Lwt.pick [ recv_msg ?uncrypt socket buf ; + (cancelation () >>= fun () -> Error_monad.fail Canceled) ] + >>=? fun (_size, message) -> + return message in (* First step: send and receive credentials, makes no difference whether we're trying to connect to a peer or checking an incoming connection, both parties must first present themselves. *) @@ -410,10 +414,14 @@ module Make (P: PARAMS) = struct versions = P.supported_versions }) >>= fun _ -> Lwt.pick [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> - Lwt.return Disconnect ) ; - recv_msg socket buf ] >>= function - | Connect { gid; port = listening_port; versions ; public_key ; - proof_of_work ; message_nonce } -> + Error_monad.fail Timeout ) ; + recv buf ] >>= function + | Error err -> + debug "(%a) error receiving from %a:%d: %a" + pp_gid my_gid Ipaddr.pp_hum addr port Error_monad.pp_print_error err ; + cancel () + | Ok (Connect { gid; port = listening_port; versions ; + public_key ; proof_of_work ; message_nonce }) -> debug "(%a) connection requested from %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; let work_proved = @@ -451,18 +459,18 @@ module Make (P: PARAMS) = struct buf local_nonce version gid public_key message_nonce listening_port end - | Advertise peers -> + | Ok (Advertise peers) -> (* alternatively, one can refuse a connection but reply with some peers, so we accept this info *) debug "(%a) new peers received from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; push (Peers peers) ; cancel () - | Disconnect -> + | Ok Disconnect -> debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () - | _ -> + | Ok _ -> debug "(%a) connection rejected (bad connection request) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () @@ -496,17 +504,19 @@ module Make (P: PARAMS) = struct | Some _ as res -> res in (* The message reception loop. *) let rec receiver () = - recv ~uncrypt buf >>= fun message -> - last := Unix.gettimeofday () ; - match message with - | Connect _ - | Disconnect -> + recv ~uncrypt buf >>= function + | Error err -> + debug "(%a) error receiving: %a" + pp_gid my_gid Error_monad.pp_print_error err ; + cancel () + | Ok Connect _ + | Ok Disconnect -> debug "(%a) disconnected (by peer) %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; cancel () - | Bootstrap -> push (Bootstrap peer) ; receiver () - | Advertise peers -> push (Peers peers) ; receiver () - | Message msg -> push (Recv (peer, msg)) ; receiver () + | Ok Bootstrap -> push (Bootstrap peer) ; receiver () + | Ok Advertise peers -> push (Peers peers) ; receiver () + | Ok Message msg -> push (Recv (peer, msg)) ; receiver () in (* Events for the main worker *) push (Connected peer) ; @@ -1176,6 +1186,7 @@ module Make (P: PARAMS) = struct let discovery_answerer = let buf = MBytes.create 0x100_000 in match config.discovery_port with + | None -> Lwt.return_unit | Some disco_port -> let answerer () = discovery_answerer @@ -1198,8 +1209,7 @@ module Make (P: PARAMS) = struct else LU.close socket in Lwt_utils.worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) - answerer cancel - | _ -> Lwt.return_unit in + answerer cancel in let discovery_sender = match config.incoming_port, config.discovery_port with | Some inco_port, Some disco_port -> From dc2084d9933d6ce7f71e86b56e668fd197451b91 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 23:01:37 +0100 Subject: [PATCH 06/14] Shell: export `P2p.gid` --- src/node/net/p2p.ml | 9 +++++++-- src/node/net/p2p.mli | 4 ++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 9cbb372d3..491e04526 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -63,6 +63,8 @@ let gid_length = 16 let pp_gid ppf gid = Format.pp_print_string ppf (Hex_encode.hex_encode gid) +let zero_gid = String.make 16 '\x00' + (* the common version for a pair of peers, if any, is the maximum one, in lexicographic order *) let common_version la lb = @@ -280,6 +282,7 @@ module Make (P: PARAMS) = struct outside world. Hidden Lwt workers are associated to a net at its creation and can be killed using the shutdown callback. *) type net = { + gid : gid ; recv_from : unit -> (peer * P.msg) Lwt.t ; send_to : peer -> P.msg -> unit Lwt.t ; try_send_to : peer -> P.msg -> bool ; @@ -1317,7 +1320,7 @@ module Make (P: PARAMS) = struct and set_metadata _gid _meta = () (* TODO: implement *) in let net = - { shutdown ; peers ; find_peer ; + { gid = my_gid ; shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send_to ; broadcast ; blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in @@ -1327,6 +1330,7 @@ module Make (P: PARAMS) = struct Lwt.return net let faked_network = + let gid = String.make 16 '\000' in let infinity, wakeup = Lwt.wait () in let shutdown () = Lwt.wakeup_exn wakeup Lwt_stream.Empty; @@ -1344,13 +1348,14 @@ module Make (P: PARAMS) = struct let peer_info _ = assert false in let get_metadata _ = None in let set_metadata _ _ = () in - { shutdown ; peers ; find_peer ; + { gid ; shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send_to ; broadcast ; blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } (* Plug toplevel functions to callback calls. *) + let gid net = net.gid let shutdown net = net.shutdown () let peers net = net.peers () let find_peer net gid = net.find_peer gid diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index 0239bb4d6..6ef313e58 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -55,6 +55,7 @@ type limits = { (** A global identifier for a peer, a.k.a. an identity *) type gid +val pp_gid : Format.formatter -> gid -> unit type 'msg encoding = Encoding : { tag: int ; @@ -97,6 +98,9 @@ module Make (P : PARAMS) : sig (** Main network initialisation function *) val bootstrap : config:config -> limits:limits -> net Lwt.t + (** Return one's gid *) + val gid : net -> gid + (** A maintenance operation : try and reach the ideal number of peers *) val maintain : net -> unit Lwt.t From 56a58cc962e981476798a9cee413e42ff26a1d2c Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 23:31:40 +0100 Subject: [PATCH 07/14] Shell: use bounded `Lwt_pipe` in `P2p` --- src/node/net/p2p.ml | 111 ++++++++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 41 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 491e04526..870ff96ba 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -269,6 +269,9 @@ module Make (P: PARAMS) = struct last_seen : unit -> float ; disconnect : unit -> unit Lwt.t; send : msg -> unit Lwt.t ; + try_send : msg -> bool ; + reader : event Lwt_pipe.t ; + writer : msg Lwt_pipe.t ; } type peer_info = { @@ -394,7 +397,7 @@ module Make (P: PARAMS) = struct canceler. *) let connect_to_peer config limits my_gid my_public_key my_secret_key my_proof_of_work - socket (addr, port) push white_listed = + socket (addr, port) control_events white_listed = (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) @@ -467,7 +470,7 @@ module Make (P: PARAMS) = struct some peers, so we accept this info *) debug "(%a) new peers received from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; - push (Peers peers) ; + let (_:bool) = Lwt_pipe.push_now control_events (Peers peers) in cancel () | Ok Disconnect -> debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" @@ -493,10 +496,14 @@ module Make (P: PARAMS) = struct let crypt buf = let nonce = get_nonce remote_nonce in Crypto_box.box my_secret_key public_key buf nonce in - let send p = send_msg ~crypt socket buf p >>= fun _ -> Lwt.return_unit in + let writer = Lwt_pipe.create 2 in + let send p = Lwt_pipe.push writer p in + let try_send p = Lwt_pipe.push_now writer p in + let reader = Lwt_pipe.create 2 in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; - listening_port ; version ; last_seen ; disconnect ; send } in + listening_port ; version ; last_seen ; + disconnect ; send ; try_send ; reader ; writer } in let uncrypt buf = let nonce = get_nonce local_nonce in match Crypto_box.box_open my_secret_key public_key buf nonce with @@ -517,15 +524,25 @@ module Make (P: PARAMS) = struct debug "(%a) disconnected (by peer) %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; cancel () - | Ok Bootstrap -> push (Bootstrap peer) ; receiver () - | Ok Advertise peers -> push (Peers peers) ; receiver () - | Ok Message msg -> push (Recv (peer, msg)) ; receiver () + | Ok Bootstrap -> Lwt_pipe.push reader (Bootstrap peer) >>= receiver + | Ok Advertise peers -> Lwt_pipe.push reader (Peers peers) >>= receiver + | Ok Message msg -> Lwt_pipe.push reader (Recv (peer, msg)) >>= receiver + in + let rec sender () = + Lwt_pipe.pop peer.writer >>= fun msg -> + send_msg ~crypt socket buf msg >>= function + | Ok _nb_sent -> + sender () + | Error err -> + debug "(%a) error sending to %a: %a" + pp_gid my_gid pp_gid gid Error_monad.pp_print_error err ; + cancel () in (* Events for the main worker *) - push (Connected peer) ; - on_cancel (fun () -> push (Disconnected peer) ; Lwt.return_unit) ; - (* Launch the worker *) - receiver () + Lwt_pipe.push control_events (Connected peer) >>= fun () -> + on_cancel (fun () -> Lwt_pipe.push control_events (Disconnected peer)) ; + (* Launch the workers *) + Lwt.join [receiver () ; sender ()] in let buf = MBytes.create maxlen in on_cancel (fun () -> @@ -727,20 +744,10 @@ module Make (P: PARAMS) = struct Sys.(set_signal sigpipe Signal_ignore) ; (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in - (* create the internal event queue *) - let enqueue_event, dequeue_event = - let queue, enqueue = Lwt_stream.create () in - (fun msg -> enqueue (Some msg)), - (fun () -> Lwt_stream.next queue) - in - (* create the external message queue *) - let enqueue_msg, dequeue_msg, close_msg_queue = - let queue, enqueue = Lwt_stream.create () in - (fun msg -> enqueue (Some msg)), - (fun () -> Lwt_stream.next queue), - (fun () -> enqueue None) - in - on_cancel (fun () -> close_msg_queue () ; Lwt.return_unit) ; + (* create the internal event pipe *) + let events = Lwt_pipe.create 100 in + (* create the external message pipe *) + let messages = Lwt_pipe.create 100 in (* fill the known peers pools from last time *) Data_encoding.Json.read_file config.peers_file >>= fun res -> let known_peers, black_list, my_gid, @@ -884,8 +891,8 @@ module Make (P: PARAMS) = struct match addr with | LU.ADDR_INET (addr, port) -> let addr = Ipaddr_unix.of_inet_addr addr in - enqueue_event (Contact ((addr, port), socket)) ; - step () + Lwt_pipe.push events (Contact ((addr, port), socket)) >>= + step | _ -> Lwt.async (fun () -> LU.close socket) ; step () @@ -961,7 +968,8 @@ module Make (P: PARAMS) = struct ] >>= fun () -> debug "(%a) connected to %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port; - enqueue_event (Contact ((addr, port), socket)) ; + Lwt_pipe.push events + (Contact ((addr, port), socket)) >>= fun () -> Lwt.return (nb - 1) end (fun exn -> @@ -1029,10 +1037,33 @@ module Make (P: PARAMS) = struct (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) (50, []) |> snd in + let rec available_events () = + let peers = PeerMap.bindings !connected in + let current_peers_evts = + List.map (fun (_, gid, p) -> + Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader) + peers + in + Lwt.choose [ + (LC.wait new_peer >>= fun _p -> available_events ()); + Lwt.nchoose @@ + (Lwt_pipe.values_available events >|= fun () -> None, events) :: current_peers_evts + ] + in + let rec choose_event () = + available_events () >>= fun evts -> + let nb_evts = List.length evts in + let gid, evtqueue = List.nth evts (Random.int nb_evts) in + begin match gid with + | None -> lwt_debug "(%a) Processing event from main" pp_gid my_gid + | Some remote_gid -> lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid remote_gid + end >|= fun () -> + Lwt_pipe.pop_now_exn evtqueue + in (* main internal event handling worker *) let rec main () = Lwt.pick - [ dequeue_event () ; + [ choose_event () ; cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> match event with | Disconnected peer -> @@ -1120,7 +1151,7 @@ module Make (P: PARAMS) = struct let canceler = connect_to_peer config limits my_gid my_public_key my_secret_key my_proof_of_work - socket (addr, port) enqueue_event white_listed in + socket (addr, port) events white_listed in debug "(%a) incoming peer @@ %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; incoming := PointMap.add (addr, port) canceler !incoming ; @@ -1130,8 +1161,8 @@ module Make (P: PARAMS) = struct Lwt.async (fun () -> peer.send (Advertise sample)) ; main () | Recv (peer, msg) -> - enqueue_msg (peer, msg) ; - main () + Lwt_pipe.push messages (peer, msg) >>= + main | Peers peers -> List.iter (fun point -> @@ -1202,13 +1233,11 @@ module Make (P: PARAMS) = struct | None -> true with Not_found -> true) then (* either reply by a list of peer or connect if we need peers *) if PeerMap.cardinal !connected >= limits.expected_connections then begin - enqueue_event (Peers [ addr, port ]) ; + Lwt_pipe.push events (Peers [ addr, port ]) >>= fun () -> send_msg socket buf (Advertise (bootstrap_peers ())) >>= fun _ -> LU.close socket - end else begin - enqueue_event (Contact ((addr, port), socket)) ; - Lwt.return_unit - end + end else + Lwt_pipe.push events (Contact ((addr, port), socket)) else LU.close socket in Lwt_utils.worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) @@ -1260,11 +1289,11 @@ module Make (P: PARAMS) = struct version = peer.version ; } and recv_from () = - dequeue_msg () + Lwt_pipe.pop messages and send_to peer msg = - peer.send (Message msg) >>= fun _ -> Lwt.return_unit + peer.send (Message msg) and try_send_to peer msg = - Lwt.async (fun () -> peer.send (Message msg)); true + peer.try_send (Message msg) and broadcast msg = PeerMap.iter (fun _ _ peer -> @@ -1333,7 +1362,7 @@ module Make (P: PARAMS) = struct let gid = String.make 16 '\000' in let infinity, wakeup = Lwt.wait () in let shutdown () = - Lwt.wakeup_exn wakeup Lwt_stream.Empty; + Lwt.wakeup_exn wakeup Queue.Empty; Lwt.return_unit in let peers () = [] in let find_peer _ = None in From 158447416b80a09b6a259a8b5f21431118fe243f Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 23:57:13 +0100 Subject: [PATCH 08/14] Shell: Count sent and received bytes in `P2p`. --- src/node/net/p2p.ml | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 870ff96ba..0f9ac312f 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -272,6 +272,10 @@ module Make (P: PARAMS) = struct try_send : msg -> bool ; reader : event Lwt_pipe.t ; writer : msg Lwt_pipe.t ; + total_sent : unit -> int ; + total_received : unit -> int ; + inflow : unit -> float ; + outflow : unit -> float ; } type peer_info = { @@ -401,10 +405,11 @@ module Make (P: PARAMS) = struct (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) - let recv ?uncrypt buf = + let recv ?received ?uncrypt buf = Lwt.pick [ recv_msg ?uncrypt socket buf ; (cancelation () >>= fun () -> Error_monad.fail Canceled) ] - >>=? fun (_size, message) -> + >>=? fun (size, message) -> + Utils.iter_option received ~f:(fun r -> r := !r + size) ; return message in (* First step: send and receive credentials, makes no difference whether we're trying to connect to a peer or checking an incoming @@ -480,12 +485,27 @@ module Make (P: PARAMS) = struct debug "(%a) connection rejected (bad connection request) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () + (* Them we can build the net object and launch the worker. *) and connected buf local_nonce version gid public_key nonce listening_port = + let feed_ma ?(freq=1.) ma counter = + let rec inner old_received = + Lwt_unix.sleep freq >>= fun () -> + let received = !counter in + ma#add_int (received - old_received); + inner received in + Lwt.async (fun () -> inner !counter) + in (* net object state *) let last = ref (Unix.gettimeofday ()) in let local_nonce = ref local_nonce in let remote_nonce = ref nonce in + let received = ref 0 in + let sent = ref 0 in + let received_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in + let sent_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in + feed_ma received_ema received ; + feed_ma sent_ema sent ; (* net object callbaks *) let last_seen () = !last in let get_nonce nonce = @@ -500,10 +520,15 @@ module Make (P: PARAMS) = struct let send p = Lwt_pipe.push writer p in let try_send p = Lwt_pipe.push_now writer p in let reader = Lwt_pipe.create 2 in + let total_sent () = !sent in + let total_received () = !received in + let inflow () = received_ema#get in + let outflow () = sent_ema#get in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; listening_port ; version ; last_seen ; - disconnect ; send ; try_send ; reader ; writer } in + disconnect ; send ; try_send ; reader ; writer ; + total_sent ; total_received ; inflow ; outflow } in let uncrypt buf = let nonce = get_nonce local_nonce in match Crypto_box.box_open my_secret_key public_key buf nonce with @@ -514,7 +539,7 @@ module Make (P: PARAMS) = struct | Some _ as res -> res in (* The message reception loop. *) let rec receiver () = - recv ~uncrypt buf >>= function + recv ~received ~uncrypt buf >>= function | Error err -> debug "(%a) error receiving: %a" pp_gid my_gid Error_monad.pp_print_error err ; From 997d1972e083dc1bdea6a1dbbcb73877198e41f4 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Tue, 29 Nov 2016 00:03:49 +0100 Subject: [PATCH 09/14] Test: update `test-p2p` --- test/test_p2p.ml | 177 +++++++++++++++++++++++++++++++---------------- 1 file changed, 118 insertions(+), 59 deletions(-) diff --git a/test/test_p2p.ml b/test/test_p2p.ml index e4f682711..bf2170083 100644 --- a/test/test_p2p.ml +++ b/test/test_p2p.ml @@ -4,13 +4,41 @@ open P2p include Logging.Make (struct let name = "test-p2p" end) module Param = struct - type msg = unit - let encodings = [Encoding { tag = 0x10; - encoding = Data_encoding.null; - wrap = (fun () -> ()); - unwrap = (fun () -> Some ()); - max_length = Some 0; - }] + + let dump_encoding = Data_encoding.(Variable.list (tup2 string string)) + + type msg = + | Create of string * string + | Update of string * string + | Delete of string + | Dump of (string * string) list + + let encodings = [ + Encoding { tag = 0x10; + encoding = Data_encoding.(tup2 string string); + wrap = (function (k, v) -> Create (k, v)); + unwrap = (function Create (k, v) -> Some (k, v) | _ -> None); + max_length = Some 0x400; + }; + Encoding { tag = 0x11; + encoding = Data_encoding.(tup2 string string); + wrap = (function (k, v) -> Update (k, v)); + unwrap = (function Create (k, v) -> Some (k, v) | _ -> None); + max_length = Some 0x400; + }; + Encoding { tag = 0x12; + encoding = Data_encoding.string; + wrap = (function x -> Delete x); + unwrap = (function Delete x -> Some x | _ -> None); + max_length = Some 0x400; + }; + Encoding { tag = 0x13; + encoding = dump_encoding; + wrap = (function x -> Dump x); + unwrap = (function Dump x -> Some x | _ -> None); + max_length = Some 0x10000; + }; + ] type metadata = unit let initial_metadata = () @@ -20,82 +48,113 @@ module Param = struct let supported_versions = [ { name = "TEST"; major = 0; minor = 0; } ] end -let peer_of_string peer = - let p = String.rindex peer ':' in - let addr = String.sub peer 0 p in - let port = String.(sub peer (p+1) (length peer - p - 1)) in - Ipaddr.of_string_exn addr, int_of_string port +module Net = Make(Param) -include Make(Param) - -let print_peer_info { gid; addr; port; version = { name; major; minor } } = +let print_peer_info { Net.gid; addr; port; version = { name; major; minor } } = Printf.sprintf "%s:%d (%s.%d.%d)" (Ipaddr.to_string addr) port name major minor -let peers_file = ref @@ Filename.temp_file "p2p-test" "" +let string_of_gid gid = Format.asprintf "%a" pp_gid gid + +let net_monitor config limits num_nets net = + let my_gid_str = string_of_gid @@ Net.gid net in + let send_msgs_to_neighbours neighbours = + Lwt_list.iter_p begin fun p -> + let { Net.gid } = Net.peer_info net p in + let remote_gid_str = string_of_gid gid in + Net.send net p (Create (my_gid_str, remote_gid_str)) >>= fun _ -> + lwt_log_notice "(%s) Done sending msg to %s" my_gid_str remote_gid_str + end neighbours >>= fun () -> + lwt_log_notice "(%s) Done sending all msgs." my_gid_str + in + let rec inner () = + let neighbours = Net.peers net in + let nb_neighbours = List.length neighbours in + if nb_neighbours < num_nets - 1 then begin + log_notice "(%s) I have %d peers" my_gid_str nb_neighbours; + Lwt_unix.sleep 1. >>= inner end + else begin + log_notice "(%s) I know all my %d peers" my_gid_str nb_neighbours; + Lwt.async (fun () -> send_msgs_to_neighbours neighbours); + let rec recv_peer_msgs acc = + if List.length acc = num_nets - 1 then begin + ListLabels.iter acc ~f:(fun (k, v) -> log_info "%s %s" k v); + Lwt.return_unit + end + else begin + lwt_log_notice "(%s) recv_peers_msgs: Got %d, need %d" + my_gid_str (List.length acc) (num_nets - 1) >>= fun () -> + Net.recv net >>= function + | p, (Create (their_gid, my_gid)) -> + lwt_log_notice "(%s) Got a message from %s" my_gid_str their_gid >>= fun () -> + recv_peer_msgs ((their_gid, my_gid) :: acc) + | _ -> assert false + end + in + recv_peer_msgs [] + end + in inner () + +let range n = + let rec inner acc = function + | -1 -> acc + | n -> inner (n :: acc) (pred n) + in + if n < 0 then invalid_arg "range" + else inner [] (pred n) let main () = let incoming_port = ref @@ Some 11732 in - let discovery_port = ref None in - let known_peers = ref [] in + let discovery_port = ref @@ Some 10732 in let closed_network = ref false in - let max_message_size = ref 1024 in + let max_packet_size = ref 1024 in let peer_answer_timeout = ref 10. in - let expected_connections = ref 1 in - let min_connections = ref 0 in - let max_connections = ref 10 in let blacklist_time = ref 100. in + let num_networks = ref 0 in + let make_net nb_neighbours n = + let config = { + incoming_port = Utils.map_option !incoming_port ~f:(fun p -> p + n); + discovery_port = !discovery_port; + known_peers = []; + peers_file = ""; + closed_network = !closed_network; + } + in + let limits = { + max_message_size = !max_packet_size; + peer_answer_timeout = !peer_answer_timeout; + expected_connections = nb_neighbours; + min_connections = nb_neighbours; + max_connections = nb_neighbours; + blacklist_time = !blacklist_time; + } + in + Net.bootstrap ~config ~limits >|= fun net -> + config, limits, net + in let spec = Arg.[ - "-iport", Int (fun p -> incoming_port := Some p), " Incoming port"; + "-start-port", Int (fun p -> incoming_port := Some p), " Incoming port"; "-dport", Int (fun p -> discovery_port := Some p), " Discovery port"; - "-peers-file", Set_string peers_file, " Peers filepath"; "-closed", Set closed_network, " Closed network mode"; - "-max-message-size", Set_int max_message_size, "int Max size of messages"; + "-max-packet-size", Set_int max_packet_size, "int Max size of packets"; "-peer-answer-timeout", Set_float peer_answer_timeout, "float Number of seconds"; - "-expected-connections", Set_int expected_connections, "conns Expected connections"; - "-min-connections", Set_int min_connections, "conns Minimal number of connections"; - "-max-connections", Set_int max_connections, "conns num of connections"; "-blacklist-time", Set_float blacklist_time, "float Number of seconds"; "-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs"; "-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs"; ] in - let anon_fun peer = known_peers := peer_of_string peer :: !known_peers in - let usage_msg = "Test P2p. Arguments are:" in + let anon_fun num_peers = num_networks := int_of_string num_peers in + let usage_msg = "Usage: %s .\nArguments are:" in Arg.parse spec anon_fun usage_msg; - let config = { - incoming_port = !incoming_port; - discovery_port = !discovery_port; - known_peers = !known_peers; - peers_file = !peers_file; - closed_network = !closed_network; - } - in - let limits = { - max_message_size = !max_message_size; - peer_answer_timeout = !peer_answer_timeout; - expected_connections = !expected_connections; - min_connections = !min_connections; - max_connections = !max_connections; - blacklist_time = !blacklist_time; - } - in - bootstrap ~config ~limits >>= fun net -> - let rec loop () = - ListLabels.iter (peers net) ~f:begin fun p -> - let pi = peer_info net p in - log_info "%s" (print_peer_info pi) - end; - Lwt_unix.sleep 3. >>= - loop - in - loop () + let nets = range !num_networks in + Lwt_list.map_p (make_net (pred !num_networks)) nets >>= fun nets -> + Lwt_list.iter_p (fun (cfg, limits, net) -> net_monitor cfg limits !num_networks net) nets >>= fun () -> + lwt_log_notice "All done!" let () = Sys.catch_break true; try Lwt_main.run @@ main () - with _ -> - Sys.remove !peers_file + with _ -> () From 5f8b74e96c889f71a573232c9693c1030fc0dd68 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Wed, 30 Nov 2016 14:48:21 +0100 Subject: [PATCH 10/14] P2P: Cancel MA computations on client disconnect --- src/node/net/p2p.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 0f9ac312f..5eb0fb4ca 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -494,7 +494,7 @@ module Make (P: PARAMS) = struct let received = !counter in ma#add_int (received - old_received); inner received in - Lwt.async (fun () -> inner !counter) + Lwt.async (fun () -> Lwt.pick [cancelation (); inner !counter]) in (* net object state *) let last = ref (Unix.gettimeofday ()) in From e1d6df6e99699334de35c7784bd95cfc6928f70c Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Wed, 30 Nov 2016 14:49:08 +0100 Subject: [PATCH 11/14] P2P: Do not accept unauthenticated Advertise msgs --- src/node/net/p2p.ml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 5eb0fb4ca..ed8cc3ab5 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -470,12 +470,9 @@ module Make (P: PARAMS) = struct buf local_nonce version gid public_key message_nonce listening_port end - | Ok (Advertise peers) -> - (* alternatively, one can refuse a connection but reply with - some peers, so we accept this info *) - debug "(%a) new peers received from %a:%d" + | Ok (Advertise _) -> + debug "(%a) connection rejected (unauthenticated Advertise) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; - let (_:bool) = Lwt_pipe.push_now control_events (Peers peers) in cancel () | Ok Disconnect -> debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" From a832c2069fd3a0bc6e07238fa75e76af4ee9e2f5 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Wed, 30 Nov 2016 15:04:06 +0100 Subject: [PATCH 12/14] P2P: Do not log ``ordinary'' errors --- src/node/net/p2p.ml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index ed8cc3ab5..2165830f5 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -424,11 +424,13 @@ module Make (P: PARAMS) = struct port = config.incoming_port ; versions = P.supported_versions }) >>= fun _ -> Lwt.pick - [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> - Error_monad.fail Timeout ) ; + [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> Error_monad.fail Timeout ) ; recv buf ] >>= function + | Error [Timeout] | Error [Canceled] | Error [Exn End_of_file] -> + (* Expected errors. No logging. *) + cancel () | Error err -> - debug "(%a) error receiving from %a:%d: %a" + log_error "(%a) error receiving from %a:%d: %a" pp_gid my_gid Ipaddr.pp_hum addr port Error_monad.pp_print_error err ; cancel () | Ok (Connect { gid; port = listening_port; versions ; From bdb2d20f0586a0084124adec3925e5576e5b19a7 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Wed, 30 Nov 2016 16:29:11 +0100 Subject: [PATCH 13/14] P2P: Introduce a worker dedicated to user events --- src/node/net/p2p.ml | 78 +++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 2165830f5..0244f1b91 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -1061,35 +1061,44 @@ module Make (P: PARAMS) = struct (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) (50, []) |> snd in - let rec available_events () = - let peers = PeerMap.bindings !connected in - let current_peers_evts = - List.map (fun (_, gid, p) -> - Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader) - peers + let next_peer_event () = + let rec peer_events () = + let peers = PeerMap.bindings !connected in + let current_peers_evts = + List.map begin function + | _, Some gid, p -> Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader + | _ -> Lwt_utils.never_ending + end peers + in + Lwt.choose [ + (LC.wait new_peer >>= fun _p -> peer_events ()); + Lwt.nchoose current_peers_evts; + ] in - Lwt.choose [ - (LC.wait new_peer >>= fun _p -> available_events ()); - Lwt.nchoose @@ - (Lwt_pipe.values_available events >|= fun () -> None, events) :: current_peers_evts - ] - in - let rec choose_event () = - available_events () >>= fun evts -> + peer_events () >>= fun evts -> let nb_evts = List.length evts in let gid, evtqueue = List.nth evts (Random.int nb_evts) in - begin match gid with - | None -> lwt_debug "(%a) Processing event from main" pp_gid my_gid - | Some remote_gid -> lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid remote_gid - end >|= fun () -> + lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid gid >|= fun () -> Lwt_pipe.pop_now_exn evtqueue in - (* main internal event handling worker *) - let rec main () = + let rec peers () = + (* user event handling worker *) + Lwt.pick [ + next_peer_event () ; + cancelation () >>= fun () -> Lwt.return Shutdown ; + ] >>= fun event -> match event with + | Recv (peer, msg) -> Lwt_pipe.push messages (peer, msg) >>= peers + | msg -> Lwt_pipe.push events msg >>= peers + in + (* internal event handling worker *) + let rec admin () = Lwt.pick - [ choose_event () ; + [ Lwt_pipe.pop events ; cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> match event with + | Recv _ -> + (* Invariant broken *) + Lwt.fail_with "admin: got a Recv message (broken invariant)" | Disconnected peer -> debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; (* remove it from the tables *) @@ -1097,7 +1106,7 @@ module Make (P: PARAMS) = struct if PeerMap.cardinal !connected < limits.min_connections then LC.broadcast too_few_peers () ; incoming := PointMap.remove peer.point !incoming ; - main () + admin () | Connected peer -> incoming := PointMap.remove peer.point !incoming ; let update_infos () = @@ -1162,7 +1171,7 @@ module Make (P: PARAMS) = struct LC.broadcast too_many_peers () ; LC.broadcast new_peer peer end ; - main () + admin () | Contact ((addr, port), socket) -> (* we do not check the credentials at this stage, since they could change from one connection to the next *) @@ -1170,7 +1179,7 @@ module Make (P: PARAMS) = struct || PeerMap.mem_by_point (addr, port) !connected || BlackList.mem addr !black_list then LU.close socket >>= fun () -> - main () + admin () else let canceler = connect_to_peer @@ -1179,14 +1188,11 @@ module Make (P: PARAMS) = struct debug "(%a) incoming peer @@ %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; incoming := PointMap.add (addr, port) canceler !incoming ; - main () + admin () | Bootstrap peer -> let sample = bootstrap_peers () in Lwt.async (fun () -> peer.send (Advertise sample)) ; - main () - | Recv (peer, msg) -> - Lwt_pipe.push messages (peer, msg) >>= - main + admin () | Peers peers -> List.iter (fun point -> @@ -1199,7 +1205,7 @@ module Make (P: PARAMS) = struct known_peers := PeerMap.update point source !known_peers ; LC.broadcast new_contact point) peers ; - main () + admin () | Shutdown -> Lwt.return_unit in @@ -1233,10 +1239,14 @@ module Make (P: PARAMS) = struct Lwt_utils.worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in - let main = + let peers_worker = Lwt_utils.worker - (Format.asprintf "(%a) reception" pp_gid my_gid) - main cancel in + (Format.asprintf "(%a) peers" pp_gid my_gid) + peers cancel in + let admin = + Lwt_utils.worker + (Format.asprintf "(%a) admin" pp_gid my_gid) + admin cancel in let unblock = Lwt_utils.worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) @@ -1282,7 +1292,7 @@ module Make (P: PARAMS) = struct (* stop accepting clients *) cancel () >>= fun () -> (* wait for both workers to end *) - Lwt.join [ welcome ; main ; maintenance ; unblock ; + Lwt.join [ welcome ; peers_worker ; admin ; maintenance ; unblock ; discovery_answerer ; discovery_sender ] >>= fun () -> (* properly shutdown all peers *) let cancelers = From 1ffe2db277453f205b576f07d3a566eade52b8f8 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Thu, 1 Dec 2016 13:39:59 +0100 Subject: [PATCH 14/14] P2P: Minor changes --- src/node/net/p2p.ml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 0244f1b91..90bca1bf8 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -426,12 +426,16 @@ module Make (P: PARAMS) = struct Lwt.pick [ ( LU.sleep limits.peer_answer_timeout >>= fun () -> Error_monad.fail Timeout ) ; recv buf ] >>= function - | Error [Timeout] | Error [Canceled] | Error [Exn End_of_file] -> - (* Expected errors. No logging. *) + | Error [Timeout] + | Error [Canceled] + | Error [Exn End_of_file] -> + debug "(%a) Closed connection to %a:%d." + pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () | Error err -> log_error "(%a) error receiving from %a:%d: %a" - pp_gid my_gid Ipaddr.pp_hum addr port Error_monad.pp_print_error err ; + pp_gid my_gid Ipaddr.pp_hum addr port + Error_monad.pp_print_error err ; cancel () | Ok (Connect { gid; port = listening_port; versions ; public_key ; proof_of_work ; message_nonce }) -> @@ -472,10 +476,6 @@ module Make (P: PARAMS) = struct buf local_nonce version gid public_key message_nonce listening_port end - | Ok (Advertise _) -> - debug "(%a) connection rejected (unauthenticated Advertise) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () | Ok Disconnect -> debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; @@ -1065,9 +1065,9 @@ module Make (P: PARAMS) = struct let rec peer_events () = let peers = PeerMap.bindings !connected in let current_peers_evts = - List.map begin function - | _, Some gid, p -> Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader - | _ -> Lwt_utils.never_ending + filter_map begin function + | _, Some gid, p -> Some (Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader) + | _ -> None end peers in Lwt.choose [