Shell: minor cosmetics in p2p.ml

This commit is contained in:
Vincent Bernardoff 2016-11-28 21:35:14 +01:00 committed by Grégoire Henry
parent 3cb307eeff
commit fdff344989

View File

@ -9,8 +9,8 @@
module LU = Lwt_unix module LU = Lwt_unix
module LC = Lwt_condition module LC = Lwt_condition
open Lwt
open Lwt_utils open Lwt.Infix
open Logging.Net open Logging.Net
(* public types *) (* public types *)
@ -150,18 +150,22 @@ module Make (P: PARAMS) = struct
(obj6 (obj6
(req "gid" (Fixed.string gid_length)) (req "gid" (Fixed.string gid_length))
(req "port" uint16) (req "port" uint16)
(req "pubKey" Crypto_box.public_key_encoding) (req "pubkey" Crypto_box.public_key_encoding)
(req "proof_of_work" Crypto_box.nonce_encoding) (req "proof_of_work" Crypto_box.nonce_encoding)
(req "message_nonce" Crypto_box.nonce_encoding) (req "message_nonce" Crypto_box.nonce_encoding)
(req "versions" (Variable.list version_encoding))) (req "versions" (Variable.list version_encoding)))
(function (function
| Connect { gid ; port ; versions ; public_key ; proof_of_work; message_nonce } -> | Connect { gid ; port ; public_key ;
proof_of_work ; message_nonce ; versions } ->
let port = match port with None -> 0 | Some port -> port in let port = match port with None -> 0 | Some port -> port in
Some (gid, port, public_key, proof_of_work, message_nonce, versions) Some (gid, port, public_key,
proof_of_work, message_nonce, versions)
| _ -> None) | _ -> None)
(fun (gid, port, public_key, proof_of_work, message_nonce, versions) -> (fun (gid, port, public_key,
let port = if port = 0 then None else Some port in proof_of_work, message_nonce, versions) ->
Connect { gid ; port ; versions ; public_key ; proof_of_work; message_nonce }); let port = if port = 0 then None else Some port in
Connect { gid ; port ; versions ;
public_key ; proof_of_work ; message_nonce });
case ~tag:0x01 null case ~tag:0x01 null
(function Disconnect -> Some () | _ -> None) (function Disconnect -> Some () | _ -> None)
(fun () -> Disconnect); (fun () -> Disconnect);
@ -183,67 +187,78 @@ module Make (P: PARAMS) = struct
(* read a message from a TCP socket *) (* read a message from a TCP socket *)
let recv_msg ?(uncrypt = (fun buf -> Some buf)) fd buf = let recv_msg ?(uncrypt = (fun buf -> Some buf)) fd buf =
catch Lwt.catch begin fun () ->
(fun () -> assert (MBytes.length buf >= 2 lsl 16) ;
assert (MBytes.length buf >= 2 lsl 16) ; Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () ->
Lwt_utils.read_mbytes ~len:hdrlen fd buf >>= fun () -> let len = EndianBigstring.BigEndian.get_uint16 buf 0 in
let len = EndianBigstring.BigEndian.get_uint16 buf 0 in (* TODO timeout read ??? *)
(* TODO timeout read ??? *) Lwt_utils.read_mbytes ~len fd buf >>= fun () ->
Lwt_utils.read_mbytes ~len fd buf >>= fun () -> let buf = MBytes.sub buf 0 len in
let buf = MBytes.sub buf 0 len in match uncrypt buf with
match uncrypt buf with | None ->
| None -> (* TODO track invalid message *)
(* TODO track invalid message *) Lwt.return Disconnect
return Disconnect | Some buf ->
| Some buf -> match Data_encoding.Binary.of_bytes msg_encoding buf with
match Data_encoding.Binary.of_bytes msg_encoding buf with | None ->
| None -> (* TODO track invalid message *)
(* TODO track invalid message *) Lwt.return Disconnect
return Disconnect | Some msg ->
| Some msg -> Lwt.return msg
Lwt.return msg) end
(function (function
| Unix.Unix_error _ | End_of_file -> return Disconnect | Unix.Unix_error _ | End_of_file -> Lwt.return Disconnect
| e -> fail e) | e -> Lwt.fail e)
(* send a message over a TCP socket *) (* send a message over a TCP socket *)
let send_msg ?crypt fd buf msg = let send_msg ?crypt fd buf msg =
catch Lwt.catch begin fun () ->
(fun () -> match Data_encoding.Binary.write msg_encoding msg buf hdrlen with
match Data_encoding.Binary.write msg_encoding msg buf hdrlen with | None -> Lwt.return_false
| None -> return_false | Some len ->
| Some len -> match crypt with
match crypt with | None ->
| None -> if len > maxlen then
if len > maxlen then Lwt.return_false
return_false else begin
else begin EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ;
EndianBigstring.BigEndian.set_int16 buf 0 (len - hdrlen) ; (* TODO timeout write ??? *)
(* TODO timeout write ??? *) Lwt_utils.write_mbytes ~len fd buf >>= fun () ->
Lwt_utils.write_mbytes ~len fd buf >>= fun () -> Lwt.return_true
return true end
end | Some crypt ->
| Some crypt -> let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in
let encbuf = crypt (MBytes.sub buf hdrlen (len - hdrlen)) in let len = MBytes.length encbuf in
let len = MBytes.length encbuf in if len > maxlen then
if len > maxlen then Lwt.return_false
return_false else begin
else begin let lenbuf = MBytes.create 2 in
let lenbuf = MBytes.create 2 in EndianBigstring.BigEndian.set_int16 lenbuf 0 len ;
EndianBigstring.BigEndian.set_int16 lenbuf 0 len ; Lwt_utils.write_mbytes fd lenbuf >>= fun () ->
Lwt_utils.write_mbytes fd lenbuf >>= fun () -> Lwt_utils.write_mbytes fd encbuf >>= fun () ->
Lwt_utils.write_mbytes fd encbuf >>= fun () -> Lwt.return_true
return true end
end) end
(function (function
| Unix.Unix_error _ | End_of_file -> return_false | Unix.Unix_error _ | End_of_file -> Lwt.return_false
| e -> fail e) | e -> Lwt.fail e)
(* The (internal) type of network events, those dispatched from peer
workers to the net and others internal to net workers. *)
type event =
| Disconnected of peer
| Bootstrap of peer
| Recv of peer * P.msg
| Peers of point list
| Contact of point * LU.file_descr
| Connected of peer
| Shutdown
(* A peer handle, as a record-encoded object, abstract from the (* A peer handle, as a record-encoded object, abstract from the
outside world. A hidden Lwt worker is associated to a peer at its outside world. A hidden Lwt worker is associated to a peer at its
creation and is killed using the disconnect callback by net creation and is killed using the disconnect callback by net
workers (on shutdown of during maintenance). *) workers (on shutdown of during maintenance). *)
type peer = { and peer = {
gid : gid ; gid : gid ;
public_key : Crypto_box.public_key ; public_key : Crypto_box.public_key ;
point : point ; point : point ;
@ -281,17 +296,6 @@ module Make (P: PARAMS) = struct
get_metadata : gid -> P.metadata option ; get_metadata : gid -> P.metadata option ;
} }
(* The (internal) type of network events, those dispatched from peer
workers to the net and others internal to net workers. *)
type event =
| Disconnected of peer
| Bootstrap of peer
| Recv of peer * P.msg
| Peers of point list
| Contact of point * LU.file_descr
| Connected of peer
| Shutdown
(* Run-time point-or-gid indexed storage, one point is bound to at (* Run-time point-or-gid indexed storage, one point is bound to at
most one gid, which is the invariant we want to keep both for the most one gid, which is the invariant we want to keep both for the
connected peers table and the known peers one *) connected peers table and the known peers one *)
@ -389,11 +393,11 @@ module Make (P: PARAMS) = struct
config limits my_gid my_public_key my_secret_key my_proof_of_work config limits my_gid my_public_key my_secret_key my_proof_of_work
socket (addr, port) push white_listed = socket (addr, port) push white_listed =
(* a non exception-based cancelation mechanism *) (* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = canceler () in let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
(* a cancelable encrypted reception *) (* a cancelable encrypted reception *)
let recv ~uncrypt buf = let recv ~uncrypt buf =
pick [ recv_msg ~uncrypt socket buf ; Lwt.pick [ recv_msg ~uncrypt socket buf ;
(cancelation () >>= fun () -> return Disconnect) ] in (cancelation () >>= fun () -> Lwt.return Disconnect) ] in
(* First step: send and receive credentials, makes no difference (* First step: send and receive credentials, makes no difference
whether we're trying to connect to a peer or checking an incoming whether we're trying to connect to a peer or checking an incoming
connection, both parties must first present themselves. *) connection, both parties must first present themselves. *)
@ -406,41 +410,49 @@ module Make (P: PARAMS) = struct
message_nonce = local_nonce ; message_nonce = local_nonce ;
port = config.incoming_port ; port = config.incoming_port ;
versions = P.supported_versions }) >>= fun _ -> versions = P.supported_versions }) >>= fun _ ->
pick [ (LU.sleep limits.peer_answer_timeout >>= fun () -> return Disconnect) ; Lwt.pick
recv_msg socket buf ] >>= function [ ( LU.sleep limits.peer_answer_timeout >>= fun () ->
| Connect { gid; port = listening_port; versions ; public_key ; proof_of_work ; message_nonce } -> Lwt.return Disconnect ) ;
recv_msg socket buf ] >>= function
| Connect { gid; port = listening_port; versions ; public_key ;
proof_of_work ; message_nonce } ->
debug "(%a) connection requested from %a @@ %a:%d" debug "(%a) connection requested from %a @@ %a:%d"
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
let work_proved = let work_proved =
Crypto_box.check_proof_of_work Crypto_box.check_proof_of_work
public_key proof_of_work Crypto_box.default_target in public_key proof_of_work Crypto_box.default_target in
if not work_proved then if not work_proved then begin
begin debug "connection rejected (invalid proof of work)" ;
debug "connection rejected (invalid proof of work)";
cancel () cancel ()
end end else begin
else match common_version P.supported_versions versions with
begin match common_version P.supported_versions versions with
| None -> | None ->
debug "(%a) connection rejected (incompatible versions) from %a:%d" debug
"(%a) connection rejected (incompatible versions) from %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ; pp_gid my_gid Ipaddr.pp_hum addr port ;
cancel () cancel ()
| Some version -> | Some version ->
if config.closed_network then if config.closed_network then
match listening_port with match listening_port with
| Some port when white_listed (addr, port) -> | Some port when white_listed (addr, port) ->
connected buf local_nonce version gid public_key message_nonce listening_port connected
buf local_nonce version gid
public_key message_nonce listening_port
| Some port -> | Some port ->
debug "(%a) connection rejected (out of the closed network) from %a:%d" debug
"(%a) connection rejected (out of the closed network) from %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ; pp_gid my_gid Ipaddr.pp_hum addr port ;
cancel () cancel ()
| None -> | None ->
debug "(%a) connection rejected (out of the closed network) from %a:unknown" debug
"(%a) connection rejected (out of the closed network) from %a:unknown"
pp_gid my_gid Ipaddr.pp_hum addr ; pp_gid my_gid Ipaddr.pp_hum addr ;
cancel () cancel ()
else else
connected buf local_nonce version gid public_key message_nonce listening_port connected
end buf local_nonce version gid
public_key message_nonce listening_port
end
| Advertise peers -> | Advertise peers ->
(* alternatively, one can refuse a connection but reply with (* alternatively, one can refuse a connection but reply with
some peers, so we accept this info *) some peers, so we accept this info *)
@ -472,7 +484,7 @@ module Make (P: PARAMS) = struct
let crypt buf = let crypt buf =
let nonce = get_nonce remote_nonce in let nonce = get_nonce remote_nonce in
Crypto_box.box my_secret_key public_key buf nonce in Crypto_box.box my_secret_key public_key buf nonce in
let send p = send_msg ~crypt socket buf p >>= fun _ -> return () in let send p = send_msg ~crypt socket buf p >>= fun _ -> Lwt.return_unit in
(* net object construction *) (* net object construction *)
let peer = { gid ; public_key ; point = (addr, port) ; let peer = { gid ; public_key ; point = (addr, port) ;
listening_port ; version ; last_seen ; disconnect ; send } in listening_port ; version ; last_seen ; disconnect ; send } in
@ -500,7 +512,7 @@ module Make (P: PARAMS) = struct
in in
(* Events for the main worker *) (* Events for the main worker *)
push (Connected peer) ; push (Connected peer) ;
on_cancel (fun () -> push (Disconnected peer) ; return ()) ; on_cancel (fun () -> push (Disconnected peer) ; Lwt.return_unit) ;
(* Launch the worker *) (* Launch the worker *)
receiver () receiver ()
in in
@ -508,12 +520,13 @@ module Make (P: PARAMS) = struct
on_cancel (fun () -> on_cancel (fun () ->
(* send_msg ~crypt socket buf Disconnect >>= fun _ -> *) (* send_msg ~crypt socket buf Disconnect >>= fun _ -> *)
LU.close socket >>= fun _ -> LU.close socket >>= fun _ ->
return ()) ; Lwt.return_unit) ;
let worker_name = let worker_name =
Format.asprintf Format.asprintf
"(%a) connection handler for %a:%d" "(%a) connection handler for %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port in pp_gid my_gid Ipaddr.pp_hum addr port in
ignore (worker ~safe:true worker_name ~run:(fun () -> connect buf) ~cancel) ; ignore (Lwt_utils.worker worker_name
~safe:true ~run:(fun () -> connect buf) ~cancel) ;
(* return the canceler *) (* return the canceler *)
cancel cancel
@ -523,7 +536,10 @@ module Make (P: PARAMS) = struct
let open Data_encoding in let open Data_encoding in
splitted splitted
~json: ~json:
(conv Ipaddr.to_string (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) string) (conv
Ipaddr.to_string
(Data_encoding.Json.wrap_error Ipaddr.of_string_exn)
string)
~binary: ~binary:
(union ~tag_size:`Uint8 (union ~tag_size:`Uint8
[ case ~tag:4 [ case ~tag:4
@ -619,50 +635,50 @@ module Make (P: PARAMS) = struct
callback to fill the answers and returns a canceler function *) callback to fill the answers and returns a canceler function *)
let discovery_answerer my_gid disco_port cancelation callback = let discovery_answerer my_gid disco_port cancelation callback =
(* init a UDP listening socket on the broadcast canal *) (* init a UDP listening socket on the broadcast canal *)
catch Lwt.catch begin fun () ->
(fun () -> let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in
let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in LU.(setsockopt main_socket SO_BROADCAST true) ;
LU.(setsockopt main_socket SO_BROADCAST true) ; LU.(setsockopt main_socket SO_REUSEADDR true) ;
LU.(setsockopt main_socket SO_REUSEADDR true) ; LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ;
LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; Lwt.return (Some main_socket)
return (Some main_socket)) end
(fun exn -> (fun exn ->
debug "(%a) will not listen to discovery requests (%s)" debug "(%a) will not listen to discovery requests (%s)"
pp_gid my_gid (string_of_unix_exn exn) ; pp_gid my_gid (string_of_unix_exn exn) ;
return None) >>= function Lwt.return_none) >>= function
| None -> return () | None -> Lwt.return_unit
| Some main_socket -> | Some main_socket ->
(* the answering function *) (* the answering function *)
let rec step () = let rec step () =
let buffer = discovery_message my_gid 0 in let buffer = discovery_message my_gid 0 in
let len = MBytes.length buffer in let len = MBytes.length buffer in
pick [ (cancelation () >>= fun () -> return None) ; Lwt.pick
(Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> [ (cancelation () >>= fun () -> Lwt.return_none) ;
return (Some r)) ] >>= function (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r ->
| Some (len', LU.ADDR_INET (addr, _)) -> Lwt.return (Some r)) ] >>= function
if len' <> len then | None -> Lwt.return_unit
step () (* drop bytes, better luck next time ! *) | Some (len', LU.ADDR_INET (addr, _)) when len' = len ->
else answerable_discovery_message
answerable_discovery_message (Data_encoding.Binary.of_bytes
(Data_encoding.Binary.of_bytes discovery_message_encoding buffer)
discovery_message_encoding buffer) my_gid
my_gid (fun _ port ->
(fun _ port -> Lwt.catch begin fun () ->
catch let ipaddr =
(fun () -> let open Ipaddr in
let ipaddr = Ipaddr_unix.of_inet_addr addr in match Ipaddr_unix.of_inet_addr addr with
let ipaddr = Ipaddr.(match ipaddr with V4 addr -> V6 (v6_of_v4 addr) | _ -> ipaddr) in | V4 addr -> V6 (v6_of_v4 addr)
let addr = Ipaddr_unix.to_inet_addr ipaddr in | V6 _ as addr -> addr in
let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in let addr = Ipaddr_unix.to_inet_addr ipaddr in
LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> let socket = LU.(socket PF_INET6 SOCK_STREAM 0) in
callback ipaddr port socket >>= fun () -> LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () ->
return ()) callback ipaddr port socket >>= fun () ->
(fun _ -> (* ignore errors *) return ()) >>= fun () -> Lwt.return_unit
step ()) end
step (fun _ -> (* ignore errors *) Lwt.return_unit) >>= fun () ->
| Some (_, _) -> step ())
step () step
| None -> return () | Some _ -> step ()
in step () in step ()
(* Sends dicover messages into space in an exponentially delayed loop, (* Sends dicover messages into space in an exponentially delayed loop,
@ -670,24 +686,28 @@ module Make (P: PARAMS) = struct
let discovery_sender my_gid disco_port inco_port cancelation restart = let discovery_sender my_gid disco_port inco_port cancelation restart =
let msg = discovery_message my_gid inco_port in let msg = discovery_message my_gid inco_port in
let rec loop delay n = let rec loop delay n =
catch Lwt.catch begin fun () ->
(fun () -> let socket = LU.(socket PF_INET SOCK_DGRAM 0) in
let socket = LU.(socket PF_INET SOCK_DGRAM 0) in LU.setsockopt socket LU.SO_BROADCAST true ;
LU.setsockopt socket LU.SO_BROADCAST true ; let broadcast_ipv4 = Unix.inet_addr_of_string "255.255.255.255" in
LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () -> LU.connect socket
Lwt_utils.write_mbytes socket msg >>= fun _ -> LU.(ADDR_INET (broadcast_ipv4, disco_port)) >>= fun () ->
LU.close socket) Lwt_utils.write_mbytes socket msg >>= fun _ ->
LU.close socket
end
(fun _ -> (fun _ ->
debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; debug "(%a) error broadcasting a discovery request" pp_gid my_gid ;
return ()) >>= fun () -> Lwt.return_unit) >>= fun () ->
pick [ (LU.sleep delay >>= fun () -> return (Some (delay, n + 1))) ; Lwt.pick
(cancelation () >>= fun () -> return None) ; [ (LU.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ;
(LC.wait restart >>= fun () -> return (Some (0.1, 0))) ] >>= function (cancelation () >>= fun () -> Lwt.return_none) ;
(LC.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ]
>>= function
| Some (delay, n) when n = 10 -> | Some (delay, n) when n = 10 ->
loop delay 9 loop delay 9
| Some (delay, n) -> | Some (delay, n) ->
loop (delay *. 2.) n loop (delay *. 2.) n
| None -> return () | None -> Lwt.return_unit
in loop 0.2 1 in loop 0.2 1
(* Main network creation and initialisation function *) (* Main network creation and initialisation function *)
@ -695,7 +715,7 @@ module Make (P: PARAMS) = struct
(* we need to ignore SIGPIPEs *) (* we need to ignore SIGPIPEs *)
Sys.(set_signal sigpipe Signal_ignore) ; Sys.(set_signal sigpipe Signal_ignore) ;
(* a non exception-based cancelation mechanism *) (* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = canceler () in let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
(* create the internal event queue *) (* create the internal event queue *)
let enqueue_event, dequeue_event = let enqueue_event, dequeue_event =
let queue, enqueue = Lwt_stream.create () in let queue, enqueue = Lwt_stream.create () in
@ -709,17 +729,19 @@ module Make (P: PARAMS) = struct
(fun () -> Lwt_stream.next queue), (fun () -> Lwt_stream.next queue),
(fun () -> enqueue None) (fun () -> enqueue None)
in in
on_cancel (fun () -> close_msg_queue () ; return ()) ; on_cancel (fun () -> close_msg_queue () ; Lwt.return_unit) ;
(* fill the known peers pools from last time *) (* fill the known peers pools from last time *)
Data_encoding.Json.read_file config.peers_file >>= fun res -> Data_encoding.Json.read_file config.peers_file >>= fun res ->
let known_peers, black_list, my_gid, my_public_key, my_secret_key, my_proof_of_work = let known_peers, black_list, my_gid,
my_public_key, my_secret_key, my_proof_of_work =
let init_peers () = let init_peers () =
let my_gid = let my_gid =
fresh_gid () in fresh_gid () in
let (my_secret_key, my_public_key) = let (my_secret_key, my_public_key) =
Crypto_box.random_keypair () in Crypto_box.random_keypair () in
let my_proof_of_work = let my_proof_of_work =
Crypto_box.generate_proof_of_work my_public_key Crypto_box.default_target in Crypto_box.generate_proof_of_work
my_public_key Crypto_box.default_target in
let known_peers = let known_peers =
let source = { unreachable_since = None ; let source = { unreachable_since = None ;
connections = None ; connections = None ;
@ -732,7 +754,8 @@ module Make (P: PARAMS) = struct
PeerMap.empty config.known_peers in PeerMap.empty config.known_peers in
let black_list = let black_list =
BlackList.empty in BlackList.empty in
known_peers, black_list, my_gid, my_public_key, my_secret_key, my_proof_of_work in known_peers, black_list, my_gid,
my_public_key, my_secret_key, my_proof_of_work in
match res with match res with
| None -> | None ->
let known_peers, black_list, my_gid, let known_peers, black_list, my_gid,
@ -809,36 +832,41 @@ module Make (P: PARAMS) = struct
in in
Data_encoding.Json.write_file config.peers_file json >>= fun _ -> Data_encoding.Json.write_file config.peers_file json >>= fun _ ->
debug "(%a) peer cache saved" pp_gid my_gid ; debug "(%a) peer cache saved" pp_gid my_gid ;
return ()) ; Lwt.return_unit) ;
(* storage of active and not yet active peers *) (* storage of active and not yet active peers *)
let incoming = ref PointMap.empty in let incoming = ref PointMap.empty in
let connected = ref PeerMap.empty in let connected = ref PeerMap.empty in
(* peer welcoming (accept) loop *) (* peer welcoming (accept) loop *)
let welcome () = let welcome () =
match config.incoming_port with match config.incoming_port with
| None -> (* no input port => no welcome worker *) return () | None -> (* no input port => no welcome worker *) Lwt.return_unit
| Some port -> | Some port ->
(* open port for incoming connexions *) (* open port for incoming connexions *)
let addr = Unix.inet6_addr_any in let addr = Unix.inet6_addr_any in
catch Lwt.catch begin fun () ->
(fun () -> let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in
let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in LU.(setsockopt main_socket SO_REUSEADDR true) ;
LU.(setsockopt main_socket SO_REUSEADDR true) ; LU.(bind main_socket (ADDR_INET (addr, port))) ;
LU.(bind main_socket (ADDR_INET (addr, port))) ; LU.listen main_socket limits.max_connections ;
LU.listen main_socket limits.max_connections ; Lwt.return (Some main_socket)
return (Some main_socket)) end
(fun exn -> (fun exn ->
debug "(%a) cannot accept incoming peers (%s)" debug "(%a) cannot accept incoming peers (%s)"
pp_gid my_gid (string_of_unix_exn exn) ; pp_gid my_gid (string_of_unix_exn exn) ;
return None)>>= function Lwt.return_none)
>>= function
| None -> | None ->
(* FIXME: run in degraded mode, better exit ? *) (* FIXME: run in degraded mode, better exit ? *)
return () Lwt.return_unit
| Some main_socket -> | Some main_socket ->
(* then loop *) (* then loop *)
let rec step () = let rec step () =
pick [ (LU.accept main_socket >>= fun (s, a) -> return (Some (s, a))) ; Lwt.pick
(cancelation () >>= fun _ -> return None) ] >>= function [ ( LU.accept main_socket >>= fun (s, a) ->
Lwt.return (Some (s, a)) ) ;
( cancelation () >>= fun _ ->
Lwt.return_none ) ]
>>= function
| None -> | None ->
LU.close main_socket LU.close main_socket
| Some (socket, addr) -> | Some (socket, addr) ->
@ -863,11 +891,17 @@ module Make (P: PARAMS) = struct
let just_maintained = LC.create () in let just_maintained = LC.create () in
(* maintenance worker, returns when [connections] peers are connected *) (* maintenance worker, returns when [connections] peers are connected *)
let rec maintenance () = let rec maintenance () =
pick [ (LU.sleep 120. >>= fun () -> return true) ; (* every two minutes *) Lwt.pick
(LC.wait please_maintain >>= fun () -> return true) ; (* when asked *) [ ( LU.sleep 120. >>= fun () ->
(LC.wait too_few_peers >>= fun () -> return true) ; (* limits *) Lwt.return_true) ; (* every two minutes *)
(LC.wait too_many_peers >>= fun () -> return true) ; ( LC.wait please_maintain >>= fun () ->
(cancelation () >>= fun () -> return false) ] >>= fun continue -> Lwt.return_true) ; (* when asked *)
( LC.wait too_few_peers >>= fun () ->
Lwt.return_true) ; (* limits *)
( LC.wait too_many_peers >>= fun () ->
Lwt.return_true) ;
( cancelation () >>= fun () ->
Lwt.return_false) ] >>= fun continue ->
let rec maintain () = let rec maintain () =
let n_connected = PeerMap.cardinal !connected in let n_connected = PeerMap.cardinal !connected in
if n_connected >= limits.expected_connections if n_connected >= limits.expected_connections
@ -895,24 +929,30 @@ module Make (P: PARAMS) = struct
not (PeerMap.mem_by_gid gid !connected)) in not (PeerMap.mem_by_gid gid !connected)) in
let rec do_contact_loop strec = let rec do_contact_loop strec =
match strec with match strec with
| 0, _ -> return true | 0, _ -> Lwt.return_true
| _, [] -> return false (* we didn't manage to contact enough peers *) | _, [] ->
Lwt.return_false (* we didn't manage to contact enough peers *)
| nb, ((addr, port), gid, source) :: tl -> | nb, ((addr, port), gid, source) :: tl ->
(* we try to open a connection *) (* we try to open a connection *)
let socket = LU.(socket (match addr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in let socket =
let open LU in
let open Ipaddr in
let family =
match addr with V4 _ -> PF_INET | V6 _ -> PF_INET6 in
socket family SOCK_STREAM 0 in
let uaddr = Ipaddr_unix.to_inet_addr addr in let uaddr = Ipaddr_unix.to_inet_addr addr in
catch Lwt.catch begin fun () ->
(fun () -> debug "(%a) trying to connect to %a:%d"
debug "(%a) trying to connect to %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ;
pp_gid my_gid Ipaddr.pp_hum addr port; Lwt.pick
Lwt.pick [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ;
[ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; LU.connect socket (LU.ADDR_INET (uaddr, port))
LU.connect socket (LU.ADDR_INET (uaddr, port)) ] >>= fun () ->
] >>= fun () -> debug "(%a) connected to %a:%d"
debug "(%a) connected to %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port;
pp_gid my_gid Ipaddr.pp_hum addr port; enqueue_event (Contact ((addr, port), socket)) ;
enqueue_event (Contact ((addr, port), socket)) ; Lwt.return (nb - 1)
return (nb - 1)) end
(fun exn -> (fun exn ->
debug "(%a) connection failed to %a:%d (%s)" debug "(%a) connection failed to %a:%d (%s)"
pp_gid my_gid Ipaddr.pp_hum addr port pp_gid my_gid Ipaddr.pp_hum addr port
@ -924,7 +964,7 @@ module Make (P: PARAMS) = struct
{ source with unreachable_since = Some now } { source with unreachable_since = Some now }
!known_peers ; !known_peers ;
LU.close socket >>= fun () -> LU.close socket >>= fun () ->
return nb) >>= fun nrec -> Lwt.return nb) >>= fun nrec ->
do_contact_loop (nrec, tl) do_contact_loop (nrec, tl)
in do_contact_loop (nb, contactable) in do_contact_loop (nb, contactable)
in in
@ -932,21 +972,25 @@ module Make (P: PARAMS) = struct
debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ; debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ;
contact to_contact >>= function contact to_contact >>= function
| true -> (* enough contacts, now wait for connections *) | true -> (* enough contacts, now wait for connections *)
pick [ (LC.wait new_peer >>= fun _ -> return true) ; Lwt.pick
(LU.sleep 1.0 >>= fun () -> return true) ; [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ;
(cancelation () >>= fun () -> return false) ] >>= fun continue -> (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ;
if continue then maintain () else return () (cancelation () >>= fun () -> Lwt.return_false) ]
>>= fun continue ->
if continue then maintain () else Lwt.return_unit
| false -> (* not enough contacts, ask the pals of our pals, | false -> (* not enough contacts, ask the pals of our pals,
discover the local network and then wait *) discover the local network and then wait *)
LC.broadcast restart_discovery () ; LC.broadcast restart_discovery () ;
(PeerMap.iter (PeerMap.iter
(fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap)) (fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap))
!connected ; !connected ;
pick [ (LC.wait new_peer >>= fun _ -> return true) ; Lwt.pick
(LC.wait new_contact >>= fun _ -> return true) ; [ (LC.wait new_peer >>= fun _ -> Lwt.return_true) ;
(LU.sleep 1.0 >>= fun () -> return true) ; (LC.wait new_contact >>= fun _ -> Lwt.return_true) ;
(cancelation () >>= fun () -> return false) ] >>= fun continue -> (LU.sleep 1.0 >>= fun () -> Lwt.return_true) ;
if continue then maintain () else return ()) (cancelation () >>= fun () -> Lwt.return_false) ]
>>= fun continue ->
if continue then maintain () else Lwt.return_unit)
else else
(* too many peers, start the russian roulette *) (* too many peers, start the russian roulette *)
let to_kill = n_connected - limits.max_connections in let to_kill = n_connected - limits.max_connections in
@ -955,13 +999,13 @@ module Make (P: PARAMS) = struct
(fun _ _ peer (i, t) -> (fun _ _ peer (i, t) ->
if i = 0 then (0, t) if i = 0 then (0, t)
else (i - 1, t >>= fun () -> peer.disconnect ())) else (i - 1, t >>= fun () -> peer.disconnect ()))
!connected (to_kill, return ())) >>= fun () -> !connected (to_kill, Lwt.return_unit)) >>= fun () ->
(* and directly skip to the next maintenance request *) (* and directly skip to the next maintenance request *)
LC.broadcast just_maintained () ; LC.broadcast just_maintained () ;
debug "(%a) maintenance step ended" pp_gid my_gid ; debug "(%a) maintenance step ended" pp_gid my_gid ;
maintenance () maintenance ()
in in
if continue then maintain () else return () if continue then maintain () else Lwt.return_unit
in in
(* select the peers to send on a bootstrap request *) (* select the peers to send on a bootstrap request *)
let bootstrap_peers () = let bootstrap_peers () =
@ -969,7 +1013,6 @@ module Make (P: PARAMS) = struct
PeerMap.bindings !known_peers |> PeerMap.bindings !known_peers |>
List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |> List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |>
List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |>
(* HERE *)
(* we simply send the first 50 (or less) known peers *) (* we simply send the first 50 (or less) known peers *)
List.fold_left List.fold_left
(fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l))
@ -977,8 +1020,9 @@ module Make (P: PARAMS) = struct
in in
(* main internal event handling worker *) (* main internal event handling worker *)
let rec main () = let rec main () =
pick [ dequeue_event () ; Lwt.pick
cancelation () >>= fun () -> return Shutdown ] >>= fun event -> [ dequeue_event () ;
cancelation () >>= fun () -> Lwt.return Shutdown ] >>= fun event ->
match event with match event with
| Disconnected peer -> | Disconnected peer ->
debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ;
@ -1002,7 +1046,8 @@ module Make (P: PARAMS) = struct
known_peers := PeerMap.remove_by_point point !known_peers ; known_peers := PeerMap.remove_by_point point !known_peers ;
known_peers := PeerMap.remove_by_gid peer.gid !known_peers ; known_peers := PeerMap.remove_by_gid peer.gid !known_peers ;
(* then assign *) (* then assign *)
known_peers := PeerMap.update point ~gid:peer.gid source !known_peers known_peers :=
PeerMap.update point ~gid:peer.gid source !known_peers
in update @@ in update @@
try match PeerMap.by_gid peer.gid !known_peers with try match PeerMap.by_gid peer.gid !known_peers with
| { connections = None ; white_listed } -> | { connections = None ; white_listed } ->
@ -1090,12 +1135,13 @@ module Make (P: PARAMS) = struct
peers ; peers ;
main () main ()
| Shutdown -> | Shutdown ->
return () Lwt.return_unit
in in
(* blacklist filter *) (* blacklist filter *)
let rec unblock () = let rec unblock () =
pick [ (Lwt_unix.sleep 20. >>= fun _ -> return true) ; Lwt.pick
(cancelation () >>= fun () -> return false) ] >>= fun continue -> [ (Lwt_unix.sleep 20. >>= fun _ -> Lwt.return_true) ;
(cancelation () >>= fun () -> Lwt.return_false) ] >>= fun continue ->
if continue then if continue then
let now = Unix.gettimeofday () in let now = Unix.gettimeofday () in
black_list := BlackList.fold black_list := BlackList.fold
@ -1110,20 +1156,33 @@ module Make (P: PARAMS) = struct
PeerMap.update point ?gid source map) PeerMap.update point ?gid source map)
!known_peers PeerMap.empty ; !known_peers PeerMap.empty ;
unblock () unblock ()
else return () else Lwt.return_unit
in in
(* launch all workers *) (* launch all workers *)
let welcome = worker (Format.asprintf "(%a) welcome" pp_gid my_gid) welcome cancel in let welcome =
let maintenance = worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in Lwt_utils.worker
let main = worker (Format.asprintf "(%a) reception" pp_gid my_gid) main cancel in (Format.asprintf "(%a) welcome" pp_gid my_gid)
let unblock = worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) unblock cancel in welcome cancel in
let maintenance =
Lwt_utils.worker
(Format.asprintf "(%a) maintenance" pp_gid my_gid)
maintenance cancel in
let main =
Lwt_utils.worker
(Format.asprintf "(%a) reception" pp_gid my_gid)
main cancel in
let unblock =
Lwt_utils.worker
(Format.asprintf "(%a) unblacklister" pp_gid my_gid)
unblock cancel in
let discovery_answerer = let discovery_answerer =
let buf = MBytes.create 0x100_000 in let buf = MBytes.create 0x100_000 in
match config.discovery_port with match config.discovery_port with
| Some disco_port -> | Some disco_port ->
let answerer () = let answerer () =
discovery_answerer my_gid disco_port cancelation @@ fun addr port socket -> discovery_answerer
(* do not reply to ourselves or conncted peers *) my_gid disco_port cancelation @@ fun addr port socket ->
(* do not reply to ourselves or connected peers *)
if not (PeerMap.mem_by_point (addr, port) !connected) if not (PeerMap.mem_by_point (addr, port) !connected)
&& (try match PeerMap.gid_by_point (addr, port) !known_peers with && (try match PeerMap.gid_by_point (addr, port) !known_peers with
| Some gid -> not (PeerMap.mem_by_gid gid !connected) | Some gid -> not (PeerMap.mem_by_gid gid !connected)
@ -1136,47 +1195,53 @@ module Make (P: PARAMS) = struct
LU.close socket LU.close socket
end else begin end else begin
enqueue_event (Contact ((addr, port), socket)) ; enqueue_event (Contact ((addr, port), socket)) ;
return () Lwt.return_unit
end end
else LU.close socket in else LU.close socket in
worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) answerer cancel Lwt_utils.worker
| _ -> return () in (Format.asprintf "(%a) discovery answerer" pp_gid my_gid)
answerer cancel
| _ -> Lwt.return_unit in
let discovery_sender = let discovery_sender =
match config.incoming_port, config.discovery_port with match config.incoming_port, config.discovery_port with
| Some inco_port, Some disco_port -> | Some inco_port, Some disco_port ->
let sender () = let sender () =
discovery_sender my_gid disco_port inco_port cancelation restart_discovery in discovery_sender
worker (Format.asprintf "(%a) discovery sender" pp_gid my_gid) sender cancel my_gid disco_port inco_port cancelation restart_discovery in
| _ -> return () in Lwt_utils.worker
(Format.asprintf "(%a) discovery sender" pp_gid my_gid)
sender cancel
| _ -> Lwt.return_unit in
(* net manipulation callbacks *) (* net manipulation callbacks *)
let rec shutdown () = let rec shutdown () =
debug "(%a) starting network shutdown" pp_gid my_gid ; debug "(%a) starting network shutdown" pp_gid my_gid ;
(* stop accepting clients *) (* stop accepting clients *)
cancel () >>= fun () -> cancel () >>= fun () ->
(* wait for both workers to end *) (* wait for both workers to end *)
join [ welcome ; main ; maintenance ; unblock ; Lwt.join [ welcome ; main ; maintenance ; unblock ;
discovery_answerer ; discovery_sender ] >>= fun () -> discovery_answerer ; discovery_sender ] >>= fun () ->
(* properly shutdown all peers *) (* properly shutdown all peers *)
let cancelers = let cancelers =
PeerMap.fold PeerMap.fold
(fun point _ peer res -> (fun point _ peer res ->
(peer.disconnect () >>= fun () -> (peer.disconnect () >>= fun () ->
connected := PeerMap.remove_by_point point !connected ; connected := PeerMap.remove_by_point point !connected ;
return ()) :: res) Lwt.return_unit) :: res)
!connected @@ !connected @@
PointMap.fold PointMap.fold
(fun point canceler res -> (fun point canceler res ->
(canceler () >>= fun () -> (canceler () >>= fun () ->
incoming := PointMap.remove point !incoming ; incoming := PointMap.remove point !incoming ;
return ()) :: res) Lwt.return_unit) :: res)
!incoming @@ [] !incoming @@ []
in in
join cancelers >>= fun () -> Lwt.join cancelers >>= fun () ->
debug "(%a) network shutdown complete" pp_gid my_gid ; debug "(%a) network shutdown complete" pp_gid my_gid ;
return () Lwt.return_unit
and peers () = and peers () =
PeerMap.fold (fun _ _ peer r -> peer :: r) !connected [] PeerMap.fold (fun _ _ peer r -> peer :: r) !connected []
and find_peer gid = try Some (PeerMap.by_gid gid !connected) with Not_found -> None and find_peer gid =
try Some (PeerMap.by_gid gid !connected) with Not_found -> None
and peer_info (peer : peer) = { and peer_info (peer : peer) = {
gid = peer.gid ; gid = peer.gid ;
addr = fst peer.point ; addr = fst peer.point ;
@ -1186,7 +1251,7 @@ module Make (P: PARAMS) = struct
and recv_from () = and recv_from () =
dequeue_msg () dequeue_msg ()
and send_to peer msg = and send_to peer msg =
peer.send (Message msg) >>= fun _ -> return () peer.send (Message msg) >>= fun _ -> Lwt.return_unit
and try_send peer msg = and try_send peer msg =
Lwt.async (fun () -> peer.send (Message msg)); true Lwt.async (fun () -> peer.send (Message msg)); true
and broadcast msg = and broadcast msg =
@ -1243,12 +1308,15 @@ module Make (P: PARAMS) = struct
and get_metadata _gid = None (* TODO: implement *) and get_metadata _gid = None (* TODO: implement *)
and set_metadata _gid _meta = () (* TODO: implement *) and set_metadata _gid _meta = () (* TODO: implement *)
in in
let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; let net =
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in { shutdown ; peers ; find_peer ;
recv_from ; send_to ; try_send ; broadcast ;
blacklist ; whitelist ; maintain ; roll ;
peer_info ; get_metadata ; set_metadata } in
(* main thread, returns after first successful maintenance *) (* main thread, returns after first successful maintenance *)
maintain () >>= fun () -> maintain () >>= fun () ->
debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; debug "(%a) network succesfully bootstrapped" pp_gid my_gid ;
return net Lwt.return net
let faked_network = let faked_network =
let infinity, wakeup = Lwt.wait () in let infinity, wakeup = Lwt.wait () in
@ -1268,8 +1336,10 @@ module Make (P: PARAMS) = struct
let peer_info _ = assert false in let peer_info _ = assert false in
let get_metadata _ = None in let get_metadata _ = None in
let set_metadata _ _ = () in let set_metadata _ _ = () in
{ shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; { shutdown ; peers ; find_peer ;
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } recv_from ; send_to ; try_send ; broadcast ;
blacklist ; whitelist ; maintain ; roll ;
peer_info ; get_metadata ; set_metadata }
(* Plug toplevel functions to callback calls. *) (* Plug toplevel functions to callback calls. *)