diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 491e04526..870ff96ba 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -269,6 +269,9 @@ module Make (P: PARAMS) = struct last_seen : unit -> float ; disconnect : unit -> unit Lwt.t; send : msg -> unit Lwt.t ; + try_send : msg -> bool ; + reader : event Lwt_pipe.t ; + writer : msg Lwt_pipe.t ; } type peer_info = { @@ -394,7 +397,7 @@ module Make (P: PARAMS) = struct canceler. *) let connect_to_peer config limits my_gid my_public_key my_secret_key my_proof_of_work - socket (addr, port) push white_listed = + socket (addr, port) control_events white_listed = (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) @@ -467,7 +470,7 @@ module Make (P: PARAMS) = struct some peers, so we accept this info *) debug "(%a) new peers received from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; - push (Peers peers) ; + let (_:bool) = Lwt_pipe.push_now control_events (Peers peers) in cancel () | Ok Disconnect -> debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" @@ -493,10 +496,14 @@ module Make (P: PARAMS) = struct let crypt buf = let nonce = get_nonce remote_nonce in Crypto_box.box my_secret_key public_key buf nonce in - let send p = send_msg ~crypt socket buf p >>= fun _ -> Lwt.return_unit in + let writer = Lwt_pipe.create 2 in + let send p = Lwt_pipe.push writer p in + let try_send p = Lwt_pipe.push_now writer p in + let reader = Lwt_pipe.create 2 in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; - listening_port ; version ; last_seen ; disconnect ; send } in + listening_port ; version ; last_seen ; + disconnect ; send ; try_send ; reader ; writer } in let uncrypt buf = let nonce = get_nonce local_nonce in match Crypto_box.box_open my_secret_key public_key buf nonce with @@ -517,15 +524,25 @@ module Make (P: PARAMS) = struct debug "(%a) disconnected (by peer) %a @@ %a:%d" pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; cancel () - | Ok Bootstrap -> push (Bootstrap peer) ; receiver () - | Ok Advertise peers -> push (Peers peers) ; receiver () - | Ok Message msg -> push (Recv (peer, msg)) ; receiver () + | Ok Bootstrap -> Lwt_pipe.push reader (Bootstrap peer) >>= receiver + | Ok Advertise peers -> Lwt_pipe.push reader (Peers peers) >>= receiver + | Ok Message msg -> Lwt_pipe.push reader (Recv (peer, msg)) >>= receiver + in + let rec sender () = + Lwt_pipe.pop peer.writer >>= fun msg -> + send_msg ~crypt socket buf msg >>= function + | Ok _nb_sent -> + sender () + | Error err -> + debug "(%a) error sending to %a: %a" + pp_gid my_gid pp_gid gid Error_monad.pp_print_error err ; + cancel () in (* Events for the main worker *) - push (Connected peer) ; - on_cancel (fun () -> push (Disconnected peer) ; Lwt.return_unit) ; - (* Launch the worker *) - receiver () + Lwt_pipe.push control_events (Connected peer) >>= fun () -> + on_cancel (fun () -> Lwt_pipe.push control_events (Disconnected peer)) ; + (* Launch the workers *) + Lwt.join [receiver () ; sender ()] in let buf = MBytes.create maxlen in on_cancel (fun () -> @@ -727,20 +744,10 @@ module Make (P: PARAMS) = struct Sys.(set_signal sigpipe Signal_ignore) ; (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in - (* create the internal event queue *) - let enqueue_event, dequeue_event = - let queue, enqueue = Lwt_stream.create () in - (fun msg -> enqueue (Some msg)), - (fun () -> Lwt_stream.next queue) - in - (* create the external message queue *) - let enqueue_msg, dequeue_msg, close_msg_queue = - let queue, enqueue = Lwt_stream.create () in - (fun msg -> enqueue (Some msg)), - (fun () -> Lwt_stream.next queue), - (fun () -> enqueue None) - in - on_cancel (fun () -> close_msg_queue () ; Lwt.return_unit) ; + (* create the internal event pipe *) + let events = Lwt_pipe.create 100 in + (* create the external message pipe *) + let messages = Lwt_pipe.create 100 in (* fill the known peers pools from last time *) Data_encoding.Json.read_file config.peers_file >>= fun res -> let known_peers, black_list, my_gid, @@ -884,8 +891,8 @@ module Make (P: PARAMS) = struct match addr with | LU.ADDR_INET (addr, port) -> let addr = Ipaddr_unix.of_inet_addr addr in - enqueue_event (Contact ((addr, port), socket)) ; - step () + Lwt_pipe.push events (Contact ((addr, port), socket)) >>= + step | _ -> Lwt.async (fun () -> LU.close socket) ; step () @@ -961,7 +968,8 @@ module Make (P: PARAMS) = struct ] >>= fun () -> debug "(%a) connected to %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port; - enqueue_event (Contact ((addr, port), socket)) ; + Lwt_pipe.push events + (Contact ((addr, port), socket)) >>= fun () -> Lwt.return (nb - 1) end (fun exn -> @@ -1029,10 +1037,33 @@ module Make (P: PARAMS) = struct (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) (50, []) |> snd in + let rec available_events () = + let peers = PeerMap.bindings !connected in + let current_peers_evts = + List.map (fun (_, gid, p) -> + Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader) + peers + in + Lwt.choose [ + (LC.wait new_peer >>= fun _p -> available_events ()); + Lwt.nchoose @@ + (Lwt_pipe.values_available events >|= fun () -> None, events) :: current_peers_evts + ] + in + let rec choose_event () = + available_events () >>= fun evts -> + let nb_evts = List.length evts in + let gid, evtqueue = List.nth evts (Random.int nb_evts) in + begin match gid with + | None -> lwt_debug "(%a) Processing event from main" pp_gid my_gid + | Some remote_gid -> lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid remote_gid + end >|= fun () -> + Lwt_pipe.pop_now_exn evtqueue + in (* main internal event handling worker *) let rec main () = Lwt.pick - [ dequeue_event () ; + [ choose_event () ; cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> match event with | Disconnected peer -> @@ -1120,7 +1151,7 @@ module Make (P: PARAMS) = struct let canceler = connect_to_peer config limits my_gid my_public_key my_secret_key my_proof_of_work - socket (addr, port) enqueue_event white_listed in + socket (addr, port) events white_listed in debug "(%a) incoming peer @@ %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; incoming := PointMap.add (addr, port) canceler !incoming ; @@ -1130,8 +1161,8 @@ module Make (P: PARAMS) = struct Lwt.async (fun () -> peer.send (Advertise sample)) ; main () | Recv (peer, msg) -> - enqueue_msg (peer, msg) ; - main () + Lwt_pipe.push messages (peer, msg) >>= + main | Peers peers -> List.iter (fun point -> @@ -1202,13 +1233,11 @@ module Make (P: PARAMS) = struct | None -> true with Not_found -> true) then (* either reply by a list of peer or connect if we need peers *) if PeerMap.cardinal !connected >= limits.expected_connections then begin - enqueue_event (Peers [ addr, port ]) ; + Lwt_pipe.push events (Peers [ addr, port ]) >>= fun () -> send_msg socket buf (Advertise (bootstrap_peers ())) >>= fun _ -> LU.close socket - end else begin - enqueue_event (Contact ((addr, port), socket)) ; - Lwt.return_unit - end + end else + Lwt_pipe.push events (Contact ((addr, port), socket)) else LU.close socket in Lwt_utils.worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) @@ -1260,11 +1289,11 @@ module Make (P: PARAMS) = struct version = peer.version ; } and recv_from () = - dequeue_msg () + Lwt_pipe.pop messages and send_to peer msg = - peer.send (Message msg) >>= fun _ -> Lwt.return_unit + peer.send (Message msg) and try_send_to peer msg = - Lwt.async (fun () -> peer.send (Message msg)); true + peer.try_send (Message msg) and broadcast msg = PeerMap.iter (fun _ _ peer -> @@ -1333,7 +1362,7 @@ module Make (P: PARAMS) = struct let gid = String.make 16 '\000' in let infinity, wakeup = Lwt.wait () in let shutdown () = - Lwt.wakeup_exn wakeup Lwt_stream.Empty; + Lwt.wakeup_exn wakeup Queue.Empty; Lwt.return_unit in let peers () = [] in let find_peer _ = None in