From bdb2d20f0586a0084124adec3925e5576e5b19a7 Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Wed, 30 Nov 2016 16:29:11 +0100 Subject: [PATCH] P2P: Introduce a worker dedicated to user events --- src/node/net/p2p.ml | 78 +++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 2165830f5..0244f1b91 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -1061,35 +1061,44 @@ module Make (P: PARAMS) = struct (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) (50, []) |> snd in - let rec available_events () = - let peers = PeerMap.bindings !connected in - let current_peers_evts = - List.map (fun (_, gid, p) -> - Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader) - peers + let next_peer_event () = + let rec peer_events () = + let peers = PeerMap.bindings !connected in + let current_peers_evts = + List.map begin function + | _, Some gid, p -> Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader + | _ -> Lwt_utils.never_ending + end peers + in + Lwt.choose [ + (LC.wait new_peer >>= fun _p -> peer_events ()); + Lwt.nchoose current_peers_evts; + ] in - Lwt.choose [ - (LC.wait new_peer >>= fun _p -> available_events ()); - Lwt.nchoose @@ - (Lwt_pipe.values_available events >|= fun () -> None, events) :: current_peers_evts - ] - in - let rec choose_event () = - available_events () >>= fun evts -> + peer_events () >>= fun evts -> let nb_evts = List.length evts in let gid, evtqueue = List.nth evts (Random.int nb_evts) in - begin match gid with - | None -> lwt_debug "(%a) Processing event from main" pp_gid my_gid - | Some remote_gid -> lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid remote_gid - end >|= fun () -> + lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid gid >|= fun () -> Lwt_pipe.pop_now_exn evtqueue in - (* main internal event handling worker *) - let rec main () = + let rec peers () = + (* user event handling worker *) + Lwt.pick [ + next_peer_event () ; + cancelation () >>= fun () -> Lwt.return Shutdown ; + ] >>= fun event -> match event with + | Recv (peer, msg) -> Lwt_pipe.push messages (peer, msg) >>= peers + | msg -> Lwt_pipe.push events msg >>= peers + in + (* internal event handling worker *) + let rec admin () = Lwt.pick - [ choose_event () ; + [ Lwt_pipe.pop events ; cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event -> match event with + | Recv _ -> + (* Invariant broken *) + Lwt.fail_with "admin: got a Recv message (broken invariant)" | Disconnected peer -> debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; (* remove it from the tables *) @@ -1097,7 +1106,7 @@ module Make (P: PARAMS) = struct if PeerMap.cardinal !connected < limits.min_connections then LC.broadcast too_few_peers () ; incoming := PointMap.remove peer.point !incoming ; - main () + admin () | Connected peer -> incoming := PointMap.remove peer.point !incoming ; let update_infos () = @@ -1162,7 +1171,7 @@ module Make (P: PARAMS) = struct LC.broadcast too_many_peers () ; LC.broadcast new_peer peer end ; - main () + admin () | Contact ((addr, port), socket) -> (* we do not check the credentials at this stage, since they could change from one connection to the next *) @@ -1170,7 +1179,7 @@ module Make (P: PARAMS) = struct || PeerMap.mem_by_point (addr, port) !connected || BlackList.mem addr !black_list then LU.close socket >>= fun () -> - main () + admin () else let canceler = connect_to_peer @@ -1179,14 +1188,11 @@ module Make (P: PARAMS) = struct debug "(%a) incoming peer @@ %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; incoming := PointMap.add (addr, port) canceler !incoming ; - main () + admin () | Bootstrap peer -> let sample = bootstrap_peers () in Lwt.async (fun () -> peer.send (Advertise sample)) ; - main () - | Recv (peer, msg) -> - Lwt_pipe.push messages (peer, msg) >>= - main + admin () | Peers peers -> List.iter (fun point -> @@ -1199,7 +1205,7 @@ module Make (P: PARAMS) = struct known_peers := PeerMap.update point source !known_peers ; LC.broadcast new_contact point) peers ; - main () + admin () | Shutdown -> Lwt.return_unit in @@ -1233,10 +1239,14 @@ module Make (P: PARAMS) = struct Lwt_utils.worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in - let main = + let peers_worker = Lwt_utils.worker - (Format.asprintf "(%a) reception" pp_gid my_gid) - main cancel in + (Format.asprintf "(%a) peers" pp_gid my_gid) + peers cancel in + let admin = + Lwt_utils.worker + (Format.asprintf "(%a) admin" pp_gid my_gid) + admin cancel in let unblock = Lwt_utils.worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) @@ -1282,7 +1292,7 @@ module Make (P: PARAMS) = struct (* stop accepting clients *) cancel () >>= fun () -> (* wait for both workers to end *) - Lwt.join [ welcome ; main ; maintenance ; unblock ; + Lwt.join [ welcome ; peers_worker ; admin ; maintenance ; unblock ; discovery_answerer ; discovery_sender ] >>= fun () -> (* properly shutdown all peers *) let cancelers =