P2P: Introduce a worker dedicated to user events

This commit is contained in:
Vincent Bernardoff 2016-11-30 16:29:11 +01:00 committed by Grégoire Henry
parent a832c2069f
commit bdb2d20f05

View File

@ -1061,35 +1061,44 @@ module Make (P: PARAMS) = struct
(fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l))
(50, []) |> snd
in
let rec available_events () =
let next_peer_event () =
let rec peer_events () =
let peers = PeerMap.bindings !connected in
let current_peers_evts =
List.map (fun (_, gid, p) ->
Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader)
peers
List.map begin function
| _, Some gid, p -> Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader
| _ -> Lwt_utils.never_ending
end peers
in
Lwt.choose [
(LC.wait new_peer >>= fun _p -> available_events ());
Lwt.nchoose @@
(Lwt_pipe.values_available events >|= fun () -> None, events) :: current_peers_evts
(LC.wait new_peer >>= fun _p -> peer_events ());
Lwt.nchoose current_peers_evts;
]
in
let rec choose_event () =
available_events () >>= fun evts ->
peer_events () >>= fun evts ->
let nb_evts = List.length evts in
let gid, evtqueue = List.nth evts (Random.int nb_evts) in
begin match gid with
| None -> lwt_debug "(%a) Processing event from main" pp_gid my_gid
| Some remote_gid -> lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid remote_gid
end >|= fun () ->
lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid gid >|= fun () ->
Lwt_pipe.pop_now_exn evtqueue
in
(* main internal event handling worker *)
let rec main () =
let rec peers () =
(* user event handling worker *)
Lwt.pick [
next_peer_event () ;
cancelation () >>= fun () -> Lwt.return Shutdown ;
] >>= fun event -> match event with
| Recv (peer, msg) -> Lwt_pipe.push messages (peer, msg) >>= peers
| msg -> Lwt_pipe.push events msg >>= peers
in
(* internal event handling worker *)
let rec admin () =
Lwt.pick
[ choose_event () ;
[ Lwt_pipe.pop events ;
cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event ->
match event with
| Recv _ ->
(* Invariant broken *)
Lwt.fail_with "admin: got a Recv message (broken invariant)"
| Disconnected peer ->
debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ;
(* remove it from the tables *)
@ -1097,7 +1106,7 @@ module Make (P: PARAMS) = struct
if PeerMap.cardinal !connected < limits.min_connections then
LC.broadcast too_few_peers () ;
incoming := PointMap.remove peer.point !incoming ;
main ()
admin ()
| Connected peer ->
incoming := PointMap.remove peer.point !incoming ;
let update_infos () =
@ -1162,7 +1171,7 @@ module Make (P: PARAMS) = struct
LC.broadcast too_many_peers () ;
LC.broadcast new_peer peer
end ;
main ()
admin ()
| Contact ((addr, port), socket) ->
(* we do not check the credentials at this stage, since they
could change from one connection to the next *)
@ -1170,7 +1179,7 @@ module Make (P: PARAMS) = struct
|| PeerMap.mem_by_point (addr, port) !connected
|| BlackList.mem addr !black_list then
LU.close socket >>= fun () ->
main ()
admin ()
else
let canceler =
connect_to_peer
@ -1179,14 +1188,11 @@ module Make (P: PARAMS) = struct
debug "(%a) incoming peer @@ %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ;
incoming := PointMap.add (addr, port) canceler !incoming ;
main ()
admin ()
| Bootstrap peer ->
let sample = bootstrap_peers () in
Lwt.async (fun () -> peer.send (Advertise sample)) ;
main ()
| Recv (peer, msg) ->
Lwt_pipe.push messages (peer, msg) >>=
main
admin ()
| Peers peers ->
List.iter
(fun point ->
@ -1199,7 +1205,7 @@ module Make (P: PARAMS) = struct
known_peers := PeerMap.update point source !known_peers ;
LC.broadcast new_contact point)
peers ;
main ()
admin ()
| Shutdown ->
Lwt.return_unit
in
@ -1233,10 +1239,14 @@ module Make (P: PARAMS) = struct
Lwt_utils.worker
(Format.asprintf "(%a) maintenance" pp_gid my_gid)
maintenance cancel in
let main =
let peers_worker =
Lwt_utils.worker
(Format.asprintf "(%a) reception" pp_gid my_gid)
main cancel in
(Format.asprintf "(%a) peers" pp_gid my_gid)
peers cancel in
let admin =
Lwt_utils.worker
(Format.asprintf "(%a) admin" pp_gid my_gid)
admin cancel in
let unblock =
Lwt_utils.worker
(Format.asprintf "(%a) unblacklister" pp_gid my_gid)
@ -1282,7 +1292,7 @@ module Make (P: PARAMS) = struct
(* stop accepting clients *)
cancel () >>= fun () ->
(* wait for both workers to end *)
Lwt.join [ welcome ; main ; maintenance ; unblock ;
Lwt.join [ welcome ; peers_worker ; admin ; maintenance ; unblock ;
discovery_answerer ; discovery_sender ] >>= fun () ->
(* properly shutdown all peers *)
let cancelers =