Shell: use bounded Lwt_pipe in P2p

This commit is contained in:
Vincent Bernardoff 2016-11-28 23:31:40 +01:00 committed by Grégoire Henry
parent dc2084d993
commit 56a58cc962

View File

@ -269,6 +269,9 @@ module Make (P: PARAMS) = struct
last_seen : unit -> float ;
disconnect : unit -> unit Lwt.t;
send : msg -> unit Lwt.t ;
try_send : msg -> bool ;
reader : event Lwt_pipe.t ;
writer : msg Lwt_pipe.t ;
}
type peer_info = {
@ -394,7 +397,7 @@ module Make (P: PARAMS) = struct
canceler. *)
let connect_to_peer
config limits my_gid my_public_key my_secret_key my_proof_of_work
socket (addr, port) push white_listed =
socket (addr, port) control_events white_listed =
(* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
(* a cancelable encrypted reception *)
@ -467,7 +470,7 @@ module Make (P: PARAMS) = struct
some peers, so we accept this info *)
debug "(%a) new peers received from %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ;
push (Peers peers) ;
let (_:bool) = Lwt_pipe.push_now control_events (Peers peers) in
cancel ()
| Ok Disconnect ->
debug "(%a) connection rejected (closed by peer or timeout) from %a:%d"
@ -493,10 +496,14 @@ module Make (P: PARAMS) = struct
let crypt buf =
let nonce = get_nonce remote_nonce in
Crypto_box.box my_secret_key public_key buf nonce in
let send p = send_msg ~crypt socket buf p >>= fun _ -> Lwt.return_unit in
let writer = Lwt_pipe.create 2 in
let send p = Lwt_pipe.push writer p in
let try_send p = Lwt_pipe.push_now writer p in
let reader = Lwt_pipe.create 2 in
(* net object construction *)
let peer = { gid ; public_key ; point = (addr, port) ;
listening_port ; version ; last_seen ; disconnect ; send } in
listening_port ; version ; last_seen ;
disconnect ; send ; try_send ; reader ; writer } in
let uncrypt buf =
let nonce = get_nonce local_nonce in
match Crypto_box.box_open my_secret_key public_key buf nonce with
@ -517,15 +524,25 @@ module Make (P: PARAMS) = struct
debug "(%a) disconnected (by peer) %a @@ %a:%d"
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
cancel ()
| Ok Bootstrap -> push (Bootstrap peer) ; receiver ()
| Ok Advertise peers -> push (Peers peers) ; receiver ()
| Ok Message msg -> push (Recv (peer, msg)) ; receiver ()
| Ok Bootstrap -> Lwt_pipe.push reader (Bootstrap peer) >>= receiver
| Ok Advertise peers -> Lwt_pipe.push reader (Peers peers) >>= receiver
| Ok Message msg -> Lwt_pipe.push reader (Recv (peer, msg)) >>= receiver
in
let rec sender () =
Lwt_pipe.pop peer.writer >>= fun msg ->
send_msg ~crypt socket buf msg >>= function
| Ok _nb_sent ->
sender ()
| Error err ->
debug "(%a) error sending to %a: %a"
pp_gid my_gid pp_gid gid Error_monad.pp_print_error err ;
cancel ()
in
(* Events for the main worker *)
push (Connected peer) ;
on_cancel (fun () -> push (Disconnected peer) ; Lwt.return_unit) ;
(* Launch the worker *)
receiver ()
Lwt_pipe.push control_events (Connected peer) >>= fun () ->
on_cancel (fun () -> Lwt_pipe.push control_events (Disconnected peer)) ;
(* Launch the workers *)
Lwt.join [receiver () ; sender ()]
in
let buf = MBytes.create maxlen in
on_cancel (fun () ->
@ -727,20 +744,10 @@ module Make (P: PARAMS) = struct
Sys.(set_signal sigpipe Signal_ignore) ;
(* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
(* create the internal event queue *)
let enqueue_event, dequeue_event =
let queue, enqueue = Lwt_stream.create () in
(fun msg -> enqueue (Some msg)),
(fun () -> Lwt_stream.next queue)
in
(* create the external message queue *)
let enqueue_msg, dequeue_msg, close_msg_queue =
let queue, enqueue = Lwt_stream.create () in
(fun msg -> enqueue (Some msg)),
(fun () -> Lwt_stream.next queue),
(fun () -> enqueue None)
in
on_cancel (fun () -> close_msg_queue () ; Lwt.return_unit) ;
(* create the internal event pipe *)
let events = Lwt_pipe.create 100 in
(* create the external message pipe *)
let messages = Lwt_pipe.create 100 in
(* fill the known peers pools from last time *)
Data_encoding.Json.read_file config.peers_file >>= fun res ->
let known_peers, black_list, my_gid,
@ -884,8 +891,8 @@ module Make (P: PARAMS) = struct
match addr with
| LU.ADDR_INET (addr, port) ->
let addr = Ipaddr_unix.of_inet_addr addr in
enqueue_event (Contact ((addr, port), socket)) ;
step ()
Lwt_pipe.push events (Contact ((addr, port), socket)) >>=
step
| _ ->
Lwt.async (fun () -> LU.close socket) ;
step ()
@ -961,7 +968,8 @@ module Make (P: PARAMS) = struct
] >>= fun () ->
debug "(%a) connected to %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port;
enqueue_event (Contact ((addr, port), socket)) ;
Lwt_pipe.push events
(Contact ((addr, port), socket)) >>= fun () ->
Lwt.return (nb - 1)
end
(fun exn ->
@ -1029,10 +1037,33 @@ module Make (P: PARAMS) = struct
(fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l))
(50, []) |> snd
in
let rec available_events () =
let peers = PeerMap.bindings !connected in
let current_peers_evts =
List.map (fun (_, gid, p) ->
Lwt_pipe.values_available p.reader >|= fun () -> gid, p.reader)
peers
in
Lwt.choose [
(LC.wait new_peer >>= fun _p -> available_events ());
Lwt.nchoose @@
(Lwt_pipe.values_available events >|= fun () -> None, events) :: current_peers_evts
]
in
let rec choose_event () =
available_events () >>= fun evts ->
let nb_evts = List.length evts in
let gid, evtqueue = List.nth evts (Random.int nb_evts) in
begin match gid with
| None -> lwt_debug "(%a) Processing event from main" pp_gid my_gid
| Some remote_gid -> lwt_debug "(%a) Processing event from %a" pp_gid my_gid pp_gid remote_gid
end >|= fun () ->
Lwt_pipe.pop_now_exn evtqueue
in
(* main internal event handling worker *)
let rec main () =
Lwt.pick
[ dequeue_event () ;
[ choose_event () ;
cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event ->
match event with
| Disconnected peer ->
@ -1120,7 +1151,7 @@ module Make (P: PARAMS) = struct
let canceler =
connect_to_peer
config limits my_gid my_public_key my_secret_key my_proof_of_work
socket (addr, port) enqueue_event white_listed in
socket (addr, port) events white_listed in
debug "(%a) incoming peer @@ %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ;
incoming := PointMap.add (addr, port) canceler !incoming ;
@ -1130,8 +1161,8 @@ module Make (P: PARAMS) = struct
Lwt.async (fun () -> peer.send (Advertise sample)) ;
main ()
| Recv (peer, msg) ->
enqueue_msg (peer, msg) ;
main ()
Lwt_pipe.push messages (peer, msg) >>=
main
| Peers peers ->
List.iter
(fun point ->
@ -1202,13 +1233,11 @@ module Make (P: PARAMS) = struct
| None -> true with Not_found -> true) then
(* either reply by a list of peer or connect if we need peers *)
if PeerMap.cardinal !connected >= limits.expected_connections then begin
enqueue_event (Peers [ addr, port ]) ;
Lwt_pipe.push events (Peers [ addr, port ]) >>= fun () ->
send_msg socket buf (Advertise (bootstrap_peers ())) >>= fun _ ->
LU.close socket
end else begin
enqueue_event (Contact ((addr, port), socket)) ;
Lwt.return_unit
end
end else
Lwt_pipe.push events (Contact ((addr, port), socket))
else LU.close socket in
Lwt_utils.worker
(Format.asprintf "(%a) discovery answerer" pp_gid my_gid)
@ -1260,11 +1289,11 @@ module Make (P: PARAMS) = struct
version = peer.version ;
}
and recv_from () =
dequeue_msg ()
Lwt_pipe.pop messages
and send_to peer msg =
peer.send (Message msg) >>= fun _ -> Lwt.return_unit
peer.send (Message msg)
and try_send_to peer msg =
Lwt.async (fun () -> peer.send (Message msg)); true
peer.try_send (Message msg)
and broadcast msg =
PeerMap.iter
(fun _ _ peer ->
@ -1333,7 +1362,7 @@ module Make (P: PARAMS) = struct
let gid = String.make 16 '\000' in
let infinity, wakeup = Lwt.wait () in
let shutdown () =
Lwt.wakeup_exn wakeup Lwt_stream.Empty;
Lwt.wakeup_exn wakeup Queue.Empty;
Lwt.return_unit in
let peers () = [] in
let find_peer _ = None in