From ff1c08f876ce44850e06c1ed1a8b16e3b8c4fe4d Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 7 Nov 2016 14:32:10 +0100 Subject: [PATCH] P2p: refactor the `mli` --- src/Makefile | 4 +- src/node/net/p2p.ml | 2366 +++++++++++++++++-------------- src/node/net/p2p.mli | 132 +- src/node/shell/discoverer.ml | 4 +- src/node/shell/discoverer.mli | 2 +- src/node/shell/messages.ml | 108 +- src/node/shell/messages.mli | 9 +- src/node/shell/node.ml | 32 +- src/node/shell/node.mli | 2 - src/node/shell/prevalidator.ml | 6 +- src/node/shell/prevalidator.mli | 2 + src/node/shell/validator.ml | 4 +- src/node/shell/validator.mli | 2 + src/node_main.ml | 1 - 14 files changed, 1452 insertions(+), 1222 deletions(-) diff --git a/src/Makefile b/src/Makefile index 2f7e838d7..815a685c7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -231,7 +231,6 @@ NODE_LIB_INTFS := \ node/shell/prevalidator.mli \ node/shell/validator.mli \ \ - node/shell/messages.mli \ node/shell/discoverer.mli \ node/shell/node_rpc_services.mli \ node/shell/node.mli \ @@ -257,9 +256,10 @@ NODE_LIB_IMPLS := \ node/updater/proto_environment.ml \ node/updater/register.ml \ \ + node/shell/messages.ml \ + node/shell/netparams.ml \ node/shell/state.ml \ \ - node/shell/messages.ml \ node/shell/prevalidator.ml \ node/shell/validator.ml \ \ diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 488c8687c..30f36c406 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -7,20 +7,25 @@ (* *) (**************************************************************************) -module LU = Lwt_unix -module LC = Lwt_condition -open Lwt -open Lwt_utils -open Netbits -open Logging.Net - -let pp_gid ppf gid = - Format.pp_print_string ppf (Hex_encode.hex_encode gid) - (* public types *) type addr = Ipaddr.t type port = int -type version = string * int * int +type version = { + name : string ; + major : int ; + minor : int ; +} + +let version_encoding = + let open Data_encoding in + conv + (fun { name; major; minor } -> (name, major, minor)) + (fun (name, major, minor) -> { name; major; minor }) + (obj3 + (req "name" string) + (req "major" int8) + (req "minor" int8)) + type limits = { max_packet_size : int ; peer_answer_timeout : float ; @@ -32,1097 +37,1302 @@ type limits = { type config = { incoming_port : port option ; discovery_port : port option ; - supported_versions : version list ; known_peers : (addr * port) list ; peers_file : string ; closed_network : bool ; } -(* the common version for a pair of peers, if any, is the maximum one, - in lexicographic order *) -let common_version la lb = - let la = List.sort (fun l r -> compare r l) la in - let lb = List.sort (fun l r -> compare r l) lb in - let rec find = function - | [], _ | _, [] -> None - | ((a :: ta) as la), ((b :: tb) as lb) -> - if a = b then Some a - else if a < b then find (ta, lb) - else find (la, tb) - in find (la, lb) +type 'msg msg_encoding = Encoding : { + tag: int ; + encoding: 'a Data_encoding.t ; + wrap: 'a -> 'msg ; + unwrap: 'msg -> 'a option ; + max_length: int option ; + } -> 'msg msg_encoding -(* The global net identificator. *) -type gid = string +module type NET_PARAMS = sig + type meta (** Type of metadata associated to an identity *) + type msg (** Type of message used by higher layers *) -(* A net point (address x port). *) -type point = addr * port + val msg_encodings : msg msg_encoding list -(* Low-level network protocol packets (internal). The protocol is - completely symmetrical and asynchronous. First both peers must - present their credentials with a [Connect] packet, then any - combination of the other packets can be received at any time. An - exception is the [Disconnect] message, which should mark the end of - transmission (and needs not being replied). The [Unkown] packet is - not a real kind of packet, it means that something indecypherable - was transmitted. *) -type packet = - | Connect of gid * int option * version list - | Disconnect - | Advertise of (addr * port) list - | Message of Netbits.frame - | Ping - | Pong - | Bootstrap - | Unknown of Netbits.frame + val init_meta : meta + val score_enc : meta Data_encoding.t + val score: meta -> float -(* read a packet from a TCP socket *) -let recv_packet - : LU.file_descr -> int -> packet Lwt.t - = fun socket limit -> - Netbits.read socket limit >>= function - | None -> - return Disconnect - | Some frame -> - let decode_versions msg frame cb = - let rec decode_versions acc = function - | F [ B name ; S maj ; S min ] :: rest -> - decode_versions ((MBytes.to_string name, maj, min) :: acc) rest - | [] -> cb (List.rev acc) - | _ -> return (Unknown msg) - in decode_versions [] frame - in - match frame with - | [ S 1 ] -> return Disconnect - | [ S 2 ] -> return Ping - | [ S 12 ] -> return Pong - | [ S 3 ] -> return Bootstrap - | [ S 4 ; B gid ; S port ; F rest ] as msg -> - decode_versions msg rest @@ fun versions -> - return (Connect (MBytes.to_string gid, Some port, versions)) - | [ S 4 ; B gid ; F rest ] as msg -> - decode_versions msg rest @@ fun versions -> - return (Connect (MBytes.to_string gid, None, versions)) - | [ S 5 ; F rest ] as msg -> - let rec decode_peers acc = function - | F [ B addr ; S port ] :: rest -> begin - match Ipaddr.of_string @@ MBytes.to_string addr with - | Some addr -> - decode_peers ((addr, port) :: acc) rest - | None -> - decode_peers acc rest - end - | [] -> Advertise (List.rev acc) - | _ -> Unknown msg - in return (decode_peers [] rest) - | [ S 6 ; F rest ] -> return (Message rest) - | msg -> return (Unknown msg) - -(* send a packet over a TCP socket *) -let send_packet - : LU.file_descr -> packet -> bool Lwt.t - = fun socket packet -> - let frame = match packet with - | Unknown _ -> assert false (* should never happen *) - | Disconnect -> [ S 1 ] - | Ping -> [ S 2 ] - | Pong -> [ S 12 ] - | Bootstrap -> [ S 3 ] - | Connect (gid, port, versions) -> - let rec encode = function - | (name, maj, min) :: tl -> - let rest = encode tl in - F [ B (MBytes.of_string name) ; S maj ; S min ] :: rest - | [] -> [] - in - [ S 4 ; B (MBytes.of_string gid) ] - @ (match port with | Some port -> [ S port ] | None -> []) - @ [ F (encode versions) ] - | Advertise peers -> - let rec encode = function - | (addr, port) :: tl -> - let rest = encode tl in - F [ B (MBytes.of_string @@ Ipaddr.to_string addr) ; S port ] :: rest - | [] -> [] - in [ S 5 ; F (encode peers) ] - | Message message -> [ S 6 ; F message ] in - Netbits.write socket frame - -(* A net handler, as a record-encoded object, abstract from the - outside world. Hidden Lwt workers are associated to a net at its - creation and can be killed using the shutdown callback. *) -type net = { - recv_from : unit -> (peer * Netbits.frame) Lwt.t ; - send_to : peer * Netbits.frame -> unit Lwt.t ; - push : peer * Netbits.frame -> unit ; - broadcast : Netbits.frame -> unit ; - blacklist : ?duration:float -> addr -> unit ; - whitelist : peer -> unit ; - maintain : unit -> unit Lwt.t ; - roll : unit -> unit Lwt.t ; - shutdown : unit -> unit Lwt.t ; - peers : unit -> peer list ; - peer_info : peer -> addr * port * version ; -} - -(* A peer handle, as a record-encoded object, abstract from the - outside world. A hidden Lwt worker is associated to a peer at its - creation and is killed using the disconnect callback by net - workers (on shutdown of during maintenance). *) -and peer = { - gid : gid ; - point : point ; - listening_port : port option ; - version : version ; - last_seen : unit -> float ; - disconnect : unit -> unit Lwt.t; - send : packet -> unit Lwt.t ; -} - -(* The (internal) type of network events, those dispatched from peer - workers to the net and others internal to net workers. *) -and event = - | Disconnected of peer - | Bootstrap of peer - | Recv of peer * Netbits.frame - | Peers of point list - | Contact of point * LU.file_descr - | Connected of peer - | Shutdown - -(* Run-time point-or-gid indexed storage, one point is bound to at - most one gid, which is the invariant we want to keep both for the - connected peers table and the known peers one *) -module GidMap = Map.Make (struct type t = gid let compare = compare end) -module GidSet = Set.Make (struct type t = gid let compare = compare end) -module PointMap = Map.Make (struct type t = point let compare = compare end) -module PointSet = Set.Make (struct type t = point let compare = compare end) -module PeerMap : sig - type 'a t - val empty : 'a t - val by_point : point -> 'a t -> 'a - val by_gid : gid -> 'a t -> 'a - val gid_by_point : point -> 'a t -> gid option - val point_by_gid : gid -> 'a t -> point - val mem_by_point : point -> 'a t -> bool - val mem_by_gid : gid -> 'a t -> bool - val remove_by_point : point -> 'a t -> 'a t - val remove_by_gid : gid -> 'a t -> 'a t - val update : point -> ?gid : gid -> 'a -> 'a t -> 'a t - val fold : (point -> gid option -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b - val iter : (point -> gid option -> 'a -> unit) -> 'a t -> unit - val bindings : 'a t -> (point * gid option * 'a) list - val cardinal : 'a t -> int -end = struct - type 'a t = - { by_point : (gid option * 'a) PointMap.t ; - by_gid : (point * 'a) GidMap.t } - - let empty = - { by_point = PointMap.empty ; - by_gid = GidMap.empty } - - let by_point point { by_point } = - let (_, v) = PointMap.find point by_point in v - - let by_gid gid { by_gid } = - let (_, v) = GidMap.find gid by_gid in v - - let gid_by_point point { by_point } = - let (gid, _) = PointMap.find point by_point in gid - - let point_by_gid gid { by_gid } = - let (point, _) = GidMap.find gid by_gid in point - - let mem_by_point point { by_point } = - PointMap.mem point by_point - - let mem_by_gid gid { by_gid } = - GidMap.mem gid by_gid - - let remove_by_point point ({ by_point ; by_gid } as map) = - try - let (gid, _) = PointMap.find point by_point in - { by_point = PointMap.remove point by_point ; - by_gid = match gid with - | None -> by_gid - | Some gid -> GidMap.remove gid by_gid } - with Not_found -> map - - let remove_by_gid gid ({ by_point ; by_gid } as map) = - try - let (point, _) = GidMap.find gid by_gid in - { by_point = PointMap.remove point by_point ; - by_gid = GidMap.remove gid by_gid } - with Not_found -> map - - let update point ?gid v map = - let { by_point ; by_gid } = - let map = remove_by_point point map in - match gid with Some gid -> remove_by_gid gid map | None -> map in - { by_point = PointMap.add point (gid, v) by_point ; - by_gid = match gid with Some gid -> GidMap.add gid (point, v) by_gid - | None -> by_gid } - - let fold f { by_point } init = - PointMap.fold - (fun point (gid, v) r -> f point gid v r) by_point init - - let iter f { by_point } = - PointMap.iter - (fun point (gid, v) -> f point gid v) by_point - - let cardinal { by_point } = - PointMap.cardinal by_point - - let bindings map = - fold (fun point gid v l -> (point, gid, v) :: l) map [] + (** High level protocol(s) talked by the peer. When two peers + initiate a connection, they exchange their list of supported + versions. The chosen one, if any, is the maximum common one (in + lexicographic order) *) + val supported_versions : version list end -(* Builds a peer and launches its associated worker. Takes a push - function for communicating with the main worker using events - (including the one sent when the connection is alive). Returns a - canceler. *) -let connect_to_peer config limits my_gid socket (addr, port) push white_listed = - (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = canceler () in - (* a cancelable reception *) - let recv () = - pick [ recv_packet socket limits.max_packet_size ; - (cancelation () >>= fun () -> return Disconnect) ] in - (* First step: send and receive credentials, makes no difference - whether we're trying to connect to a peer or checking an incoming - connection, both parties must first present themselves. *) - let rec connect () = - send_packet socket (Connect (my_gid, - config.incoming_port, - config.supported_versions)) >>= fun _ -> - pick [ (LU.sleep limits.peer_answer_timeout >>= fun () -> return Disconnect) ; - recv () ] >>= function - | Connect (gid, listening_port, versions) -> - debug "(%a) connection requested from %a @ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - begin match common_version config.supported_versions versions with - | None -> - debug "(%a) connection rejected (incompatible versions) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | Some version -> - if config.closed_network then - match listening_port with - | Some port when white_listed (addr, port) -> - connected version gid listening_port - | Some port -> - debug "(%a) connection rejected (out of the closed network) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | None -> - debug "(%a) connection rejected (out of the closed network) from %a:unknown" - pp_gid my_gid Ipaddr.pp_hum addr ; - cancel () - else - connected version gid listening_port +module type S = sig + include NET_PARAMS + + type net + + (** A faked p2p layer, which do not initiate any connection + nor open any listening socket. *) + val faked_network : net + + (** Main network initialisation function *) + val bootstrap : config:config -> limits:limits -> net Lwt.t + + (** A maintenance operation : try and reach the ideal number of peers *) + val maintain : net -> unit Lwt.t + + (** Voluntarily drop some peers and replace them by new buddies *) + val roll : net -> unit Lwt.t + + (** Close all connections properly *) + val shutdown : net -> unit Lwt.t + + (** A connection to a peer *) + type peer + + (** A global identifier for a peer, a.k.a. an identity *) + type gid + + (** Access the domain of active peers *) + val peers : net -> peer list + + (** Return the active peer with identity [gid] *) + val find_peer : net -> gid -> peer option + + type peer_info = { + gid : gid; + addr : addr; + port : port; + version : version; + } + + (** Access the info of an active peer, if available *) + val peer_info : net -> peer -> peer_info + + (** Accessors for meta information about a peer *) + val get_meta : net -> gid -> meta option + val set_meta : net -> gid -> meta -> unit + + (** Wait for a payload from any peer in the network *) + val recv : net -> (peer * msg) Lwt.t + + (** Send a payload to a peer and wait for it to be in the tube *) + val send : net -> peer -> msg -> unit Lwt.t + + (** Send a payload to a peer without waiting for the result. Return + [true] if the message can be enqueued in the peer's output queue + or [false] otherwise. *) + val try_send : net -> peer -> msg -> bool + + (** Send a payload to all peers *) + val broadcast : net -> msg -> unit + + (** Shutdown the connection to all peers at this address and stop the + communications with this machine for [duration] seconds *) + val blacklist : net -> gid -> unit + + (** Keep a connection to this pair as often as possible *) + val whitelist : net -> gid -> unit +end + +module Make (P: NET_PARAMS) = struct + module LU = Lwt_unix + module LC = Lwt_condition + open Lwt + open Lwt_utils + open Netbits + open Logging.Net + + let pp_gid ppf gid = + Format.pp_print_string ppf (Hex_encode.hex_encode gid) + + (* the common version for a pair of peers, if any, is the maximum one, + in lexicographic order *) + let common_version la lb = + let la = List.sort (fun l r -> compare r l) la in + let lb = List.sort (fun l r -> compare r l) lb in + let rec find = function + | [], _ | _, [] -> None + | ((a :: ta) as la), ((b :: tb) as lb) -> + if a = b then Some a + else if a < b then find (ta, lb) + else find (la, tb) + in find (la, lb) + + (* The global net identificator. *) + type gid = string + + (* A net point (address x port). *) + type point = addr * port + + let point_encoding = + let open Data_encoding in + let open Ipaddr in + conv + (fun (addr, port) -> + (match addr with + | V4 v4 -> V4.to_bytes v4 + | V6 v6 -> V6.to_bytes v6), port) + (fun (addr, port) -> + (match String.length addr with + | 4 -> V4 (V4.of_bytes_exn addr) + | 16 -> V6 (V6.of_bytes_exn addr) + | _ -> Pervasives.failwith "point_encoding"), port) + (obj2 + (req "addr" string) + (req "port" int16)) + + (* Low-level network protocol packets (internal). The protocol is + completely symmetrical and asynchronous. First both peers must + present their credentials with a [Connect] packet, then any + combination of the other packets can be received at any time. An + exception is the [Disconnect] message, which should mark the end of + transmission (and needs not being replied). The [Unkown] packet is + not a real kind of packet, it means that something indecypherable + was transmitted. *) + type hello = { + gid: gid; + port: int option; + versions: version list; + } + + let hello_encoding = + let open Data_encoding in + conv + (fun { gid; port; versions } -> (gid, port, versions)) + (fun (gid, port, versions) -> { gid; port; versions }) + (obj3 + (req "gid" (Fixed.string 16)) (* TODO: get rid of constant *) + (opt "port" int16) + (req "versions" (Variable.list version_encoding))) + + type msg = + | Connect of hello + | Disconnect + | Advertise of point list + | Ping + | Pong + | Bootstrap + | Message of P.msg + + let msg_encoding = + let open Data_encoding in + union ~tag_size:`Uint8 begin [ + case ~tag:0x00 hello_encoding + (function Connect hello -> Some hello | _ -> None) + (fun hello -> Connect hello); + case ~tag:0x01 null + (function Disconnect -> Some () | _ -> None) + (fun () -> Disconnect); + case ~tag:0x02 null + (function Ping -> Some () | _ -> None) + (fun () -> Ping); + case ~tag:0x03 null + (function Pong -> Some () | _ -> None) + (fun () -> Pong); + case ~tag:0x04 (Variable.list point_encoding) + (function Advertise points -> Some points | _ -> None) + (fun points -> Advertise points); + case ~tag:0x05 null + (function Bootstrap -> Some () | _ -> None) + (fun () -> Bootstrap); + ] @ + ListLabels.map P.msg_encodings ~f:begin function Encoding { tag; encoding; wrap; unwrap } -> + case ~tag encoding + (function Message msg -> unwrap msg | _ -> None) + (fun msg -> Message (wrap msg)) + end + end + + let max_length = function + | 0 -> Some 1024 + | 1 -> Some 0 + | 2 -> Some 0 + | 3 -> Some 0 + | 4 -> Some (1 + 1000 * 17) (* tag + 1000 * max (point size) *) + | 5 -> Some 0 + | n -> ListLabels.fold_left P.msg_encodings ~init:None ~f:begin fun a -> function + Encoding { tag; max_length } -> if tag = n then max_length else a + end + + module BE = EndianBigstring.BigEndian + + (** Read a message from a file descriptor and returns (tag, msg) *) + let read fd buf = + let rec read_into_exactly ?(pos=0) ?len descr buf = + let len = match len with None -> MBytes.length buf | Some l -> l in + let rec inner pos len = + if len = 0 then + Lwt.return_unit + else + Lwt_bytes.read descr buf pos len >>= fun nb_read -> + inner (pos + nb_read) (len - nb_read) + in + inner pos len + in + catch (fun () -> + Lwt_bytes.recv fd buf 0 4 [ Lwt_unix.MSG_PEEK ] >>= fun hdrlen -> + if hdrlen <> 4 then begin + debug "read: could not read enough bytes to determine message size, aborting"; + return None + end + else + Lwt_bytes.read fd buf 0 4 >>= fun _hdrlen -> + let len = Int32.to_int (BE.get_int32 buf 0) in + if len < 0 || len > MBytes.length buf then begin + debug "read: invalid message size %d" len; + return None end - | Advertise peers -> - (* alternatively, one can refuse a connection but reply with - some peers, so we accept this info *) - debug "(%a) new peers received from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - push (Peers peers) ; - cancel () - | Disconnect -> - debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - | _ -> - debug "(%a) connection rejected (bad connection request) from %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - cancel () - (* Them we can build the net object and launch the worker. *) - and connected version gid listening_port = - (* net object state *) - let last = ref (Unix.gettimeofday ()) in - (* net object callbaks *) - let last_seen () = !last in - let disconnect () = cancel () in - let send p = send_packet socket p >>= fun _ -> return () in - (* net object construction *) - let peer = { gid ; point = (addr, port) ; listening_port ; - version ; last_seen ; disconnect ; send } in - (* The packet reception loop. *) - let rec receiver () = - recv () >>= fun packet -> - last := Unix.gettimeofday () ; - match packet with - | Connect _ - | Unknown _ -> - debug "(%a) disconnected (bad request) %a @ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - cancel () - | Disconnect -> - debug "(%a) disconnected (by peer) %a @ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - cancel () - | Bootstrap -> push (Bootstrap peer) ; receiver () - | Advertise peers -> push (Peers peers) ; receiver () - | Ping -> send_packet socket Pong >>= fun _ -> receiver () - | Pong -> receiver () - | Message msg -> - push (Recv (peer, msg)) ; receiver () - in - (* The polling loop *) - let rec pulse_monitor ping = - pick [ (cancelation () >>= fun () -> return false) ; - (LU.sleep limits.peer_answer_timeout >>= fun () -> return true)] - >>= fun continue -> - if continue then - match ping with - | Some tping -> - if !last -. tping < 0. then begin - debug "(%a) disconnected (timeout exceeded) %a @ %a:%d" - pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; - cancel () - end else - pulse_monitor None - | None -> - let now = Unix.gettimeofday () in - if now -. !last < limits.peer_answer_timeout then - pulse_monitor None - else - send_packet socket Ping >>= fun _ -> - pulse_monitor (Some (Unix.gettimeofday ())) - else return () - in - (* Events for the main worker *) - push (Connected peer) ; - on_cancel (fun () -> push (Disconnected peer) ; return ()) ; - (* Launch both workers *) - join [ pulse_monitor None ; receiver () ] - in - on_cancel (fun () -> - send_packet socket Disconnect >>= fun _ -> - LU.close socket >>= fun _ -> - return ()) ; - let worker_name = - Format.asprintf - "(%a) connection handler for %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port in - ignore (worker ~safe:true worker_name ~run:connect ~cancel) ; - (* return the canceler *) - cancel + else + read_into_exactly fd buf ~pos:4 ~len >|= fun () -> + let tag = BE.get_uint8 buf 4 in + Some (tag, MBytes.sub buf 4 len)) + (function + | Unix.Unix_error (_err, _, _) -> return None + | e -> fail e) - -(* JSON format for on-disk peers cache file *) -let addr_encoding = - let open Data_encoding in - splitted - ~json: - (conv Ipaddr.to_string (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) string) - ~binary: - (union ~tag_size:`Uint8 - [ case ~tag:4 - (Fixed.string 4) - (fun ip -> Utils.map_option Ipaddr.V4.to_bytes (Ipaddr.to_v4 ip) ) - (fun b -> Ipaddr.(V4 (V4.of_bytes_exn b))) ; - case ~tag:6 - (Fixed.string 32) - (fun ip -> Some (Ipaddr.V6.to_bytes (Ipaddr.to_v6 ip))) - (fun b -> Ipaddr.(V6 (V6.of_bytes_exn b))) ; - ]) - -let peers_file_encoding = - let open Data_encoding in - obj2 - (req "gid" string) - (req "peers" - (obj3 - (req "known" - (list (obj3 - (req "addr" addr_encoding) - (req "port" int31) - (opt "infos" - (obj3 - (req "connections" int31) - (req "lastSeen" float) - (req "gid" string)))))) - (req "blacklisted" - (list (obj2 - (req "addr" addr_encoding) - (req "until" float)))) - (req "whitelisted" - (list (obj2 - (req "addr" addr_encoding) - (req "port" int31)))))) - -(* Info on peers maintained between connections *) -type source = - { unreachable_since : float option; - connections : (int * float) option ; - white_listed : bool } - -(* Ad hoc comparison on sources such as good source < bad source *) -let compare_sources s1 s2 = - match s1.white_listed, s2.white_listed with - | true, false -> -1 | false, true -> 1 - | _, _ -> - match s1.unreachable_since, s2.unreachable_since with - | None, Some _ -> -1 | Some _, None -> 1 - | _, _ -> - match s1.connections, s2.connections with - | Some _, None -> -1 | None, Some _ -> 1 | None, None -> 0 - | Some (n1, t1), Some (n2, t2) -> - if n1 = n2 then compare t2 t1 - else compare n2 n1 - -(* A store for blacklisted addresses (we ban any peer on a blacklisted - address, which is the policy that seems to make the most sense) *) -module BlackList = Map.Make (struct type t = addr let compare = compare end) - -(* A good random string so it is probably unique on the network *) -let fresh_gid () = - Bytes.to_string @@ Sodium.Random.Bytes.generate 16 - - -(* The (fixed size) broadcast frame. *) -let discovery_message gid port = - Netbits.([ B (MBytes.of_string "DISCO") ; B (MBytes.of_string gid) ; S port ]) - -(* Broadcast frame verifier. *) -let answerable_discovery_message message my_gid when_ok when_not = - match message with - | Some [ B magic ; B gid ; S port ] -> - if MBytes.to_string magic = "DISCO" && MBytes.to_string gid <> my_gid then - when_ok gid port - else when_not () - | _ -> when_not () - -let string_of_unix_exn = function - | Unix.Unix_error (err, fn, _) -> "in " ^ fn ^ ", " ^ Unix.error_message err - | exn -> Printexc.to_string exn - -(* Launch an answer machine for the discovery mechanism, takes a - callback to fill the answers and returns a canceler function *) -let discovery_answerer my_gid disco_port cancelation callback = - (* init a UDP listening socket on the broadcast canal *) - catch - (fun () -> - let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.(setsockopt main_socket SO_BROADCAST true) ; - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; - return (Some main_socket)) - (fun exn -> - debug "(%a) will not listen to discovery requests (%s)" - pp_gid my_gid (string_of_unix_exn exn) ; - return None) >>= function - | None -> return () - | Some main_socket -> - (* the answering function *) - let rec step () = - let buffer = Netbits.to_raw (discovery_message my_gid 0) in - let len = MBytes.length buffer in - pick [ (cancelation () >>= fun () -> return None) ; - (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> - return (Some r)) ] >>= function - | Some (len', LU.ADDR_INET (addr, _)) -> - if len' <> len then - step () (* drop bytes, better luck next time ! *) - else - answerable_discovery_message (Netbits.of_raw buffer) my_gid - (fun _ port -> - catch - (fun () -> - let ipaddr = Ipaddr_unix.of_inet_addr addr in - let socket = LU.(socket (match ipaddr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in - LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> - callback ipaddr port socket >>= fun () -> - return ()) - (fun _ -> (* ignore errors *) return ()) >>= fun () -> - step ()) - step - | Some (_, _) -> - step () - | None -> return () - in step () - -(* Sends dicover messages into space in an exponentially delayed loop, - restartable using a condition *) -let discovery_sender my_gid disco_port inco_port cancelation restart = - let message = discovery_message my_gid inco_port in - let rec loop delay n = + (** Write a message to file descriptor. *) + let write ?(pos=0) ?len descr buf = + let len = match len with None -> MBytes.length buf | Some l -> l in catch (fun () -> - let socket = LU.(socket PF_INET SOCK_DGRAM 0) in - LU.setsockopt socket LU.SO_BROADCAST true ; - LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () -> - Netbits.(write socket message) >>= fun _ -> - LU.close socket) - (fun _ -> - debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; - return ()) >>= fun () -> - pick [ (LU.sleep delay >>= fun () -> return (Some (delay, n + 1))) ; - (cancelation () >>= fun () -> return None) ; - (LC.wait restart >>= fun () -> return (Some (0.1, 0))) ] >>= function + Lwt_bytes.write descr buf pos len >>= fun _nb_written -> + return true) + (function + | Unix.Unix_error _ -> return false + | e -> fail e) + + (* read a message from a TCP socket *) + let recv_msg fd buf = + read fd buf >|= function + | None -> None + | Some (tag, msg) -> + match max_length tag with + | Some len when MBytes.length msg > len -> None + | _ -> Data_encoding.Binary.of_bytes msg_encoding msg + + (* send a message over a TCP socket *) + let send_msg fd buf packet = + catch + (fun () -> + match Data_encoding.Binary.write msg_encoding packet buf 4 with + | None -> return false + | Some len -> + BE.set_int32 buf 0 @@ Int32.of_int (len - 4); + write fd buf ~len + ) + (fun exn -> Lwt.fail exn) + + (* A peer handle, as a record-encoded object, abstract from the + outside world. A hidden Lwt worker is associated to a peer at its + creation and is killed using the disconnect callback by net + workers (on shutdown of during maintenance). *) + type peer = { + gid : gid ; + point : point ; + listening_port : port option ; + version : version ; + last_seen : unit -> float ; + disconnect : unit -> unit Lwt.t; + send : msg -> unit Lwt.t ; + } + + type peer_info = { + gid : gid ; + addr : addr ; + port : port ; + version : version ; + } + + (* A net handler, as a record-encoded object, abstract from the + outside world. Hidden Lwt workers are associated to a net at its + creation and can be killed using the shutdown callback. *) + type net = { + recv_from : unit -> (peer * P.msg) Lwt.t ; + send_to : peer -> P.msg -> unit Lwt.t ; + try_send : peer -> P.msg -> bool ; + broadcast : P.msg -> unit ; + blacklist : ?duration:float -> addr -> unit ; + whitelist : peer -> unit ; + maintain : unit -> unit Lwt.t ; + roll : unit -> unit Lwt.t ; + shutdown : unit -> unit Lwt.t ; + peers : unit -> peer list ; + find_peer : gid -> peer option ; + peer_info : peer -> peer_info ; + set_meta : gid -> P.meta -> unit ; + get_meta : gid -> P.meta option ; + } + + (* The (internal) type of network events, those dispatched from peer + workers to the net and others internal to net workers. *) + type event = + | Disconnected of peer + | Bootstrap of peer + | Recv of peer * P.msg + | Peers of point list + | Contact of point * LU.file_descr + | Connected of peer + | Shutdown + + (* Run-time point-or-gid indexed storage, one point is bound to at + most one gid, which is the invariant we want to keep both for the + connected peers table and the known peers one *) + module GidMap = Map.Make (struct type t = gid let compare = compare end) + module GidSet = Set.Make (struct type t = gid let compare = compare end) + module PointMap = Map.Make (struct type t = point let compare = compare end) + module PointSet = Set.Make (struct type t = point let compare = compare end) + module PeerMap : sig + type 'a t + val empty : 'a t + val by_point : point -> 'a t -> 'a + val by_gid : gid -> 'a t -> 'a + val gid_by_point : point -> 'a t -> gid option + val point_by_gid : gid -> 'a t -> point + val mem_by_point : point -> 'a t -> bool + val mem_by_gid : gid -> 'a t -> bool + val remove_by_point : point -> 'a t -> 'a t + val remove_by_gid : gid -> 'a t -> 'a t + val update : point -> ?gid : gid -> 'a -> 'a t -> 'a t + val fold : (point -> gid option -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b + val iter : (point -> gid option -> 'a -> unit) -> 'a t -> unit + val bindings : 'a t -> (point * gid option * 'a) list + val cardinal : 'a t -> int + end = struct + type 'a t = + { by_point : (gid option * 'a) PointMap.t ; + by_gid : (point * 'a) GidMap.t } + + let empty = + { by_point = PointMap.empty ; + by_gid = GidMap.empty } + + let by_point point { by_point } = + let (_, v) = PointMap.find point by_point in v + + let by_gid gid { by_gid } = + let (_, v) = GidMap.find gid by_gid in v + + let gid_by_point point { by_point } = + let (gid, _) = PointMap.find point by_point in gid + + let point_by_gid gid { by_gid } = + let (point, _) = GidMap.find gid by_gid in point + + let mem_by_point point { by_point } = + PointMap.mem point by_point + + let mem_by_gid gid { by_gid } = + GidMap.mem gid by_gid + + let remove_by_point point ({ by_point ; by_gid } as map) = + try + let (gid, _) = PointMap.find point by_point in + { by_point = PointMap.remove point by_point ; + by_gid = match gid with + | None -> by_gid + | Some gid -> GidMap.remove gid by_gid } + with Not_found -> map + + let remove_by_gid gid ({ by_point ; by_gid } as map) = + try + let (point, _) = GidMap.find gid by_gid in + { by_point = PointMap.remove point by_point ; + by_gid = GidMap.remove gid by_gid } + with Not_found -> map + + let update point ?gid v map = + let { by_point ; by_gid } = + let map = remove_by_point point map in + match gid with Some gid -> remove_by_gid gid map | None -> map in + { by_point = PointMap.add point (gid, v) by_point ; + by_gid = match gid with Some gid -> GidMap.add gid (point, v) by_gid + | None -> by_gid } + + let fold f { by_point } init = + PointMap.fold + (fun point (gid, v) r -> f point gid v r) by_point init + + let iter f { by_point } = + PointMap.iter + (fun point (gid, v) -> f point gid v) by_point + + let cardinal { by_point } = + PointMap.cardinal by_point + + let bindings map = + fold (fun point gid v l -> (point, gid, v) :: l) map [] + end + + (* Builds a peer and launches its associated worker. Takes a push + function for communicating with the main worker using events + (including the one sent when the connection is alive). Returns a + canceler. *) + let connect_to_peer config limits my_gid socket (addr, port) push white_listed = + (* a non exception-based cancelation mechanism *) + let cancelation, cancel, on_cancel = canceler () in + (* a cancelable reception *) + let recv buf = + pick [ (recv_msg socket buf >|= function Some p -> p | None -> Disconnect); + (cancelation () >>= fun () -> return Disconnect) ] in + (* First step: send and receive credentials, makes no difference + whether we're trying to connect to a peer or checking an incoming + connection, both parties must first present themselves. *) + let rec connect buf = + send_msg socket buf (Connect { gid = my_gid ; + port = config.incoming_port ; + versions = P.supported_versions }) >>= fun _ -> + pick [ (LU.sleep limits.peer_answer_timeout >>= fun () -> return Disconnect) ; + recv buf ] >>= function + | Connect { gid; port = listening_port; versions } -> + debug "(%a) connection requested from %a @ %a:%d" + pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; + begin match common_version P.supported_versions versions with + | None -> + debug "(%a) connection rejected (incompatible versions) from %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + cancel () + | Some version -> + if config.closed_network then + match listening_port with + | Some port when white_listed (addr, port) -> + connected buf version gid listening_port + | Some port -> + debug "(%a) connection rejected (out of the closed network) from %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + cancel () + | None -> + debug "(%a) connection rejected (out of the closed network) from %a:unknown" + pp_gid my_gid Ipaddr.pp_hum addr ; + cancel () + else + connected buf version gid listening_port + end + | Advertise peers -> + (* alternatively, one can refuse a connection but reply with + some peers, so we accept this info *) + debug "(%a) new peers received from %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + push (Peers peers) ; + cancel () + | Disconnect -> + debug "(%a) connection rejected (closed by peer or timeout) from %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + cancel () + | _ -> + debug "(%a) connection rejected (bad connection request) from %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + cancel () + (* Them we can build the net object and launch the worker. *) + and connected buf version gid listening_port = + (* net object state *) + let last = ref (Unix.gettimeofday ()) in + (* net object callbaks *) + let last_seen () = !last in + let disconnect () = cancel () in + let send p = send_msg socket buf p >>= fun _ -> return () in + (* net object construction *) + let peer = { gid ; point = (addr, port) ; listening_port ; + version ; last_seen ; disconnect ; send } in + (* The packet reception loop. *) + let rec receiver () = + recv buf >>= fun packet -> + last := Unix.gettimeofday () ; + match packet with + | Connect _ + | Disconnect -> + debug "(%a) disconnected (by peer) %a @ %a:%d" + pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; + cancel () + | Bootstrap -> push (Bootstrap peer) ; receiver () + | Advertise peers -> push (Peers peers) ; receiver () + | Ping -> send_msg socket buf Pong >>= fun _ -> receiver () + | Pong -> receiver () + | Message msg -> push (Recv (peer, msg)) ; receiver () + in + (* The polling loop *) + let rec pulse_monitor ping = + pick [ (cancelation () >>= fun () -> return false) ; + (LU.sleep limits.peer_answer_timeout >>= fun () -> return true)] + >>= fun continue -> + if continue then + match ping with + | Some tping -> + if !last -. tping < 0. then begin + debug "(%a) disconnected (timeout exceeded) %a @ %a:%d" + pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ; + cancel () + end else + pulse_monitor None + | None -> + let now = Unix.gettimeofday () in + if now -. !last < limits.peer_answer_timeout then + pulse_monitor None + else + send_msg socket buf Ping >>= fun _ -> + pulse_monitor (Some (Unix.gettimeofday ())) + else return () + in + (* Events for the main worker *) + push (Connected peer) ; + on_cancel (fun () -> push (Disconnected peer) ; return ()) ; + (* Launch both workers *) + join [ pulse_monitor None ; receiver () ] + in + let buf = MBytes.create 0x100_000 in + on_cancel (fun () -> + send_msg socket buf Disconnect >>= fun _ -> + LU.close socket >>= fun _ -> + return ()) ; + let worker_name = + Format.asprintf + "(%a) connection handler for %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port in + ignore (worker ~safe:true worker_name ~run:(fun () -> connect buf) ~cancel) ; + (* return the canceler *) + cancel + + + (* JSON format for on-disk peers cache file *) + let addr_encoding = + let open Data_encoding in + splitted + ~json: + (conv Ipaddr.to_string (Data_encoding.Json.wrap_error Ipaddr.of_string_exn) string) + ~binary: + (union ~tag_size:`Uint8 + [ case ~tag:4 + (Fixed.string 4) + (fun ip -> Utils.map_option Ipaddr.V4.to_bytes (Ipaddr.to_v4 ip) ) + (fun b -> Ipaddr.(V4 (V4.of_bytes_exn b))) ; + case ~tag:6 + (Fixed.string 32) + (fun ip -> Some (Ipaddr.V6.to_bytes (Ipaddr.to_v6 ip))) + (fun b -> Ipaddr.(V6 (V6.of_bytes_exn b))) ; + ]) + + let peers_file_encoding = + let open Data_encoding in + obj2 + (req "gid" string) + (req "peers" + (obj3 + (req "known" + (list (obj3 + (req "addr" addr_encoding) + (req "port" int31) + (opt "infos" + (obj3 + (req "connections" int31) + (req "lastSeen" float) + (req "gid" string)))))) + (req "blacklisted" + (list (obj2 + (req "addr" addr_encoding) + (req "until" float)))) + (req "whitelisted" + (list (obj2 + (req "addr" addr_encoding) + (req "port" int31)))))) + + (* Info on peers maintained between connections *) + type source = { + unreachable_since : float option; + connections : (int * float) option ; + white_listed : bool ; + meta : P.meta ; + } + + (* Ad hoc comparison on sources such as good source < bad source *) + let compare_sources s1 s2 = + match s1.white_listed, s2.white_listed with + | true, false -> -1 | false, true -> 1 + | _, _ -> + match s1.unreachable_since, s2.unreachable_since with + | None, Some _ -> -1 | Some _, None -> 1 + | _, _ -> + match s1.connections, s2.connections with + | Some _, None -> -1 | None, Some _ -> 1 | None, None -> 0 + | Some (n1, t1), Some (n2, t2) -> + if n1 = n2 then compare t2 t1 + else compare n2 n1 + + (* A store for blacklisted addresses (we ban any peer on a blacklisted + address, which is the policy that seems to make the most sense) *) + module BlackList = Map.Make (struct type t = addr let compare = compare end) + + (* A good random string so it is probably unique on the network *) + let fresh_gid () = + Bytes.to_string @@ Sodium.Random.Bytes.generate 16 + + + (* The (fixed size) broadcast frame. *) + let discovery_message gid port = + Netbits.([ B (MBytes.of_string "DISCO") ; B (MBytes.of_string gid) ; S port ]) + + (* Broadcast frame verifier. *) + let answerable_discovery_message message my_gid when_ok when_not = + match message with + | Some [ B magic ; B gid ; S port ] -> + if MBytes.to_string magic = "DISCO" && MBytes.to_string gid <> my_gid then + when_ok gid port + else when_not () + | _ -> when_not () + + let string_of_unix_exn = function + | Unix.Unix_error (err, fn, _) -> "in " ^ fn ^ ", " ^ Unix.error_message err + | exn -> Printexc.to_string exn + + (* Launch an answer machine for the discovery mechanism, takes a + callback to fill the answers and returns a canceler function *) + let discovery_answerer my_gid disco_port cancelation callback = + (* init a UDP listening socket on the broadcast canal *) + catch + (fun () -> + let main_socket = LU.(socket PF_INET SOCK_DGRAM 0) in + LU.(setsockopt main_socket SO_BROADCAST true) ; + LU.(setsockopt main_socket SO_REUSEADDR true) ; + LU.(bind main_socket (ADDR_INET (Unix.inet_addr_any, disco_port))) ; + return (Some main_socket)) + (fun exn -> + debug "(%a) will not listen to discovery requests (%s)" + pp_gid my_gid (string_of_unix_exn exn) ; + return None) >>= function + | None -> return () + | Some main_socket -> + (* the answering function *) + let rec step () = + let buffer = Netbits.to_raw (discovery_message my_gid 0) in + let len = MBytes.length buffer in + pick [ (cancelation () >>= fun () -> return None) ; + (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> + return (Some r)) ] >>= function + | Some (len', LU.ADDR_INET (addr, _)) -> + if len' <> len then + step () (* drop bytes, better luck next time ! *) + else + answerable_discovery_message (Netbits.of_raw buffer) my_gid + (fun _ port -> + catch + (fun () -> + let ipaddr = Ipaddr_unix.of_inet_addr addr in + let socket = LU.(socket (match ipaddr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in + LU.connect socket LU.(ADDR_INET (addr, port)) >>= fun () -> + callback ipaddr port socket >>= fun () -> + return ()) + (fun _ -> (* ignore errors *) return ()) >>= fun () -> + step ()) + step + | Some (_, _) -> + step () + | None -> return () + in step () + + (* Sends dicover messages into space in an exponentially delayed loop, + restartable using a condition *) + let discovery_sender my_gid disco_port inco_port cancelation restart = + let message = discovery_message my_gid inco_port in + let rec loop delay n = + catch + (fun () -> + let socket = LU.(socket PF_INET SOCK_DGRAM 0) in + LU.setsockopt socket LU.SO_BROADCAST true ; + LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () -> + Netbits.(write socket message) >>= fun _ -> + LU.close socket) + (fun _ -> + debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; + return ()) >>= fun () -> + pick [ (LU.sleep delay >>= fun () -> return (Some (delay, n + 1))) ; + (cancelation () >>= fun () -> return None) ; + (LC.wait restart >>= fun () -> return (Some (0.1, 0))) ] >>= function | Some (delay, n) when n = 10 -> loop delay 9 | Some (delay, n) -> loop (delay *. 2.) n | None -> return () - in loop 0.2 1 + in loop 0.2 1 -(* Main network creation and initialisation function *) -let bootstrap config limits = - (* we need to ignore SIGPIPEs *) - Sys.(set_signal sigpipe Signal_ignore) ; - (* a non exception-based cancelation mechanism *) - let cancelation, cancel, on_cancel = canceler () in - (* create the internal event queue *) - let enqueue_event, dequeue_event = - let queue, enqueue = Lwt_stream.create () in - (fun msg -> enqueue (Some msg)), - (fun () -> Lwt_stream.next queue) - in - (* create the external message queue *) - let enqueue_msg, dequeue_msg, close_msg_queue = - let queue, enqueue = Lwt_stream.create () in - (fun msg -> enqueue (Some msg)), - (fun () -> Lwt_stream.next queue), - (fun () -> enqueue None) - in - on_cancel (fun () -> close_msg_queue () ; return ()) ; - (* fill the known peers pools from last time *) - Data_encoding.Json.read_file config.peers_file >>= fun res -> - let known_peers, black_list, my_gid = - let init_peers () = - let my_gid = - fresh_gid () in - let known_peers = - let source = + (* Main network creation and initialisation function *) + let bootstrap ~config ~limits = + (* we need to ignore SIGPIPEs *) + Sys.(set_signal sigpipe Signal_ignore) ; + (* a non exception-based cancelation mechanism *) + let cancelation, cancel, on_cancel = canceler () in + (* create the internal event queue *) + let enqueue_event, dequeue_event = + let queue, enqueue = Lwt_stream.create () in + (fun msg -> enqueue (Some msg)), + (fun () -> Lwt_stream.next queue) + in + (* create the external message queue *) + let enqueue_msg, dequeue_msg, close_msg_queue = + let queue, enqueue = Lwt_stream.create () in + (fun msg -> enqueue (Some msg)), + (fun () -> Lwt_stream.next queue), + (fun () -> enqueue None) + in + on_cancel (fun () -> close_msg_queue () ; return ()) ; + (* fill the known peers pools from last time *) + Data_encoding.Json.read_file config.peers_file >>= fun res -> + let known_peers, black_list, my_gid = + let init_peers () = + let my_gid = + fresh_gid () in + let known_peers = + let source = { unreachable_since = None ; + connections = None ; + white_listed = true ; + meta = P.init_meta ; + } + in + List.fold_left + (fun r point -> PeerMap.update point source r) + PeerMap.empty config.known_peers in + let black_list = + BlackList.empty in + known_peers, black_list, my_gid in + match res with + | None -> + let known_peers, black_list, my_gid = init_peers () in + debug "(%a) peer cache initiated" pp_gid my_gid ; + ref known_peers, ref black_list, my_gid + | Some json -> + match Data_encoding.Json.destruct peers_file_encoding json with + | exception _ -> + let known_peers, black_list, my_gid = init_peers () in + debug "(%a) peer cache reset" pp_gid my_gid ; + ref known_peers, ref black_list, my_gid + | (my_gid, (k, b, w)) -> + let white_list = + List.fold_right PointSet.add w PointSet.empty in + let known_peers = + List.fold_left + (fun r (addr, port, infos) -> + match infos with + | None -> + let source = + { unreachable_since = None ; + connections = None ; + white_listed = true ; + meta = P.init_meta ; } in + PeerMap.update (addr, port) source r + | Some (c, t, gid) -> + let source = + { unreachable_since = None ; + connections = Some (c, t) ; + white_listed = PointSet.mem (addr, port) white_list ; + meta = P.init_meta ; } in + PeerMap.update (addr, port) ~gid source r) + PeerMap.empty k in + let black_list = + List.fold_left + (fun r (a, d) -> BlackList.add a d r) + BlackList.empty b in + debug "(%a) peer cache loaded" pp_gid my_gid ; + ref known_peers, ref black_list, my_gid + in + (* some peer reachability predicates *) + let black_listed (addr, _) = + BlackList.mem addr !black_list in + let white_listed point = + try (PeerMap.by_point point !known_peers).white_listed + with Not_found -> false in + let grey_listed point = + try match (PeerMap.by_point point !known_peers).unreachable_since with + | None -> false | Some t -> Unix.gettimeofday () -. t > 5. + with Not_found -> false in + (* save the cache at exit *) + on_cancel (fun () -> + (* save the known peers cache *) + let json = + Data_encoding.Json.construct peers_file_encoding @@ + (my_gid, + PeerMap.fold + (fun (addr, port) gid source (k, b, w) -> + let infos = match gid, source.connections with + | Some gid, Some (n, t) -> Some (n, t, gid) + | _ -> None in + ((addr, port, infos) :: k, + b, + if source.white_listed then (addr, port) :: w else w)) + !known_peers ([], BlackList.bindings !black_list, [])) + in + Data_encoding.Json.write_file config.peers_file json >>= fun _ -> + debug "(%a) peer cache saved" pp_gid my_gid ; + return ()) ; + (* storage of active and not yet active peers *) + let incoming = ref PointMap.empty in + let connected = ref PeerMap.empty in + (* peer welcoming (accept) loop *) + let welcome () = + match config.incoming_port with + | None -> (* no input port => no welcome worker *) return () + | Some port -> + (* open port for incoming connexions *) + let addr = Unix.inet6_addr_any in + catch + (fun () -> + let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in + LU.(setsockopt main_socket SO_REUSEADDR true) ; + LU.(bind main_socket (ADDR_INET (addr, port))) ; + LU.listen main_socket limits.max_connections ; + return (Some main_socket)) + (fun exn -> + debug "(%a) cannot accept incoming peers (%s)" + pp_gid my_gid (string_of_unix_exn exn) ; + return None)>>= function + | None -> + (* FIXME: run in degraded mode, better exit ? *) + return () + | Some main_socket -> + (* then loop *) + let rec step () = + pick [ (LU.accept main_socket >>= fun (s, a) -> return (Some (s, a))) ; + (cancelation () >>= fun _ -> return None) ] >>= function + | None -> + LU.close main_socket + | Some (socket, addr) -> + match addr with + | LU.ADDR_INET (addr, port) -> + let addr = Ipaddr_unix.of_inet_addr addr in + enqueue_event (Contact ((addr, port), socket)) ; + step () + | _ -> + Lwt.async (fun () -> LU.close socket) ; + step () + in step () + in + (* input maintenance events *) + let too_many_peers = LC.create () in + let too_few_peers = LC.create () in + let new_peer = LC.create () in + let new_contact = LC.create () in + let please_maintain = LC.create () in + let restart_discovery = LC.create () in + (* output maintenance events *) + let just_maintained = LC.create () in + (* maintenance worker, returns when [connections] peers are connected *) + let rec maintenance () = + pick [ (LU.sleep 120. >>= fun () -> return true) ; (* every two minutes *) + (LC.wait please_maintain >>= fun () -> return true) ; (* when asked *) + (LC.wait too_few_peers >>= fun () -> return true) ; (* limits *) + (LC.wait too_many_peers >>= fun () -> return true) ; + (cancelation () >>= fun () -> return false) ] >>= fun continue -> + let rec maintain () = + let n_connected = PeerMap.cardinal !connected in + if n_connected >= limits.expected_connections + && n_connected <= limits.max_connections then + (* end of maintenance when enough users have been reached *) + (LC.broadcast just_maintained () ; + debug "(%a) maintenance step ended" + pp_gid my_gid ; + maintenance ()) + else if n_connected < limits.expected_connections then + (* too few peers, try and contact many peers *) + let contact nb = + let contactable = + (* we sort sources by level (prefered first) *) + PeerMap.bindings !known_peers |> + List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> + (* remove the ones we're connect(ed/ing) to and the blacklisted *) + List.filter (fun (point, gid, source) -> + (not (black_listed point) || source.white_listed) + && not (grey_listed point) + && not (gid = Some my_gid) + && not (PeerMap.mem_by_point point !connected) + && not (PointMap.mem point !incoming) + && match gid with | None -> true | Some gid -> + not (PeerMap.mem_by_gid gid !connected)) in + let rec do_contact_loop strec = + match strec with + | 0, _ -> return true + | _, [] -> return false (* we didn't manage to contact enough peers *) + | nb, ((addr, port), gid, source) :: tl -> + (* we try to open a connection *) + let socket = LU.(socket (match addr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in + let uaddr = Ipaddr_unix.to_inet_addr addr in + catch + (fun () -> + lwt_debug "Trying to connect to %a:%d" + Ipaddr.pp_hum addr port >>= fun () -> + Lwt.pick + [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; + LU.connect socket (LU.ADDR_INET (uaddr, port)) + ] >>= fun () -> + lwt_debug "Connected to %a:%d" + Ipaddr.pp_hum addr port >>= fun () -> + enqueue_event (Contact ((addr, port), socket)) ; + return (nb - 1)) + (fun exn -> + lwt_debug "Connection failed to %a:%d (%s)" + Ipaddr.pp_hum addr port + (string_of_unix_exn exn) >>= fun () -> + (* if we didn't succes, we greylist it *) + let now = Unix.gettimeofday () in + known_peers := + PeerMap.update (addr, port) ?gid + { source with unreachable_since = Some now } + !known_peers ; + LU.close socket >>= fun () -> + return nb) >>= fun nrec -> + do_contact_loop (nrec, tl) + in do_contact_loop (nb, contactable) + in + let to_contact = limits.max_connections - n_connected in + debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ; + contact to_contact >>= function + | true -> (* enough contacts, now wait for connections *) + pick [ (LC.wait new_peer >>= fun _ -> return true) ; + (LU.sleep 1.0 >>= fun () -> return true) ; + (cancelation () >>= fun () -> return false) ] >>= fun continue -> + if continue then maintain () else return () + | false -> (* not enough contacts, ask the pals of our pals, + discover the local network and then wait *) + LC.broadcast restart_discovery () ; + (PeerMap.iter + (fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap)) + !connected ; + pick [ (LC.wait new_peer >>= fun _ -> return true) ; + (LC.wait new_contact >>= fun _ -> return true) ; + (LU.sleep 1.0 >>= fun () -> return true) ; + (cancelation () >>= fun () -> return false) ] >>= fun continue -> + if continue then maintain () else return ()) + else + (* too many peers, start the russian roulette *) + let to_kill = n_connected - limits.max_connections in + debug "(%a) too many connections, will kill %d" pp_gid my_gid to_kill ; + snd (PeerMap.fold + (fun _ _ peer (i, t) -> + if i = 0 then (0, t) + else (i - 1, t >>= fun () -> peer.disconnect ())) + !connected (to_kill, return ())) >>= fun () -> + (* and directly skip to the next maintenance request *) + LC.broadcast just_maintained () ; + debug "(%a) maintenance step ended" pp_gid my_gid ; + maintenance () + in + if continue then maintain () else return () + in + (* select the peers to send on a bootstrap request *) + let bootstrap_peers () = + (* we sort peers by desirability *) + PeerMap.bindings !known_peers |> + List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |> + List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> + (* HERE *) + (* we simply send the first 50 (or less) known peers *) + List.fold_left + (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) + (50, []) |> snd + in + (* main internal event handling worker *) + let rec main () = + pick [ dequeue_event () ; + cancelation () >>= fun () -> return Shutdown ] >>= fun event -> + match event with + | Disconnected peer -> + debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; + (* remove it from the tables *) + connected := PeerMap.remove_by_point peer.point !connected ; + if PeerMap.cardinal !connected < limits.min_connections then + LC.broadcast too_few_peers () ; + incoming := PointMap.remove peer.point !incoming ; + main () + | Connected peer -> + incoming := PointMap.remove peer.point !incoming ; + let update_infos () = + (* we update our knowledge table according to the + reachable address given by the peer *) + match peer.listening_port with + | None -> () + | Some port -> + let point = (fst peer.point, port) in + let update source = + (* delete previous infos about this address / gid *) + known_peers := PeerMap.remove_by_point point !known_peers ; + known_peers := PeerMap.remove_by_gid peer.gid !known_peers ; + (* then assign *) + known_peers := PeerMap.update point ~gid:peer.gid source !known_peers + in update @@ + try match PeerMap.by_gid peer.gid !known_peers with + | { connections = None ; white_listed } -> + { connections = Some (1, Unix.gettimeofday ()) ; + unreachable_since = None ; + white_listed ; + meta = P.init_meta } + | { connections = Some (n, _) ; white_listed } -> + { connections = Some (n + 1, Unix.gettimeofday ()) ; + unreachable_since = None ; + white_listed ; + meta = P.init_meta } + with Not_found -> + { connections = Some (1, Unix.gettimeofday ()) ; + unreachable_since = None ; + white_listed = white_listed point ; + meta = P.init_meta } + in + (* if it's me, it's probably not me *) + if my_gid = peer.gid then begin + debug "(%a) rejected myself from %a:%d" + pp_gid my_gid Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; + (* now that I know my address, I can save this info to + prevent future reconnections to myself *) + update_infos () ; + Lwt.async peer.disconnect + end + (* keep only one connection to each node by checking its gid *) + else if PeerMap.mem_by_gid peer.gid !connected then begin + debug "(%a) rejected already connected peer %a @ %a:%d" + pp_gid my_gid pp_gid peer.gid + Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; + update_infos () ; + Lwt.async peer.disconnect + end else begin + debug "(%a) connected peer %a @ %a:%d" + pp_gid my_gid pp_gid peer.gid + Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; + update_infos () ; + connected := + PeerMap.update peer.point ~gid:peer.gid peer !connected ; + if PeerMap.cardinal !connected > limits.max_connections then + LC.broadcast too_many_peers () ; + LC.broadcast new_peer peer + end ; + main () + | Contact ((addr, port), socket) -> + (* we do not check the credentials at this stage, since they + could change from one connection to the next *) + if PointMap.mem (addr, port) !incoming + || PeerMap.mem_by_point (addr, port) !connected + || BlackList.mem addr !black_list then + LU.close socket >>= fun () -> + main () + else + let canceler = + connect_to_peer config limits my_gid socket (addr, port) enqueue_event white_listed in + debug "(%a) incoming peer at %a:%d" + pp_gid my_gid Ipaddr.pp_hum addr port ; + incoming := PointMap.add (addr, port) canceler !incoming ; + main () + | Bootstrap peer -> + let sample = bootstrap_peers () in + Lwt.async (fun () -> peer.send (Advertise sample)) ; + main () + | Recv (peer, msg) -> + enqueue_msg (peer, msg) ; + main () + | Peers peers -> + List.iter + (fun point -> + if not (PeerMap.mem_by_point point !known_peers) then + let source = + { unreachable_since = None ; + connections = None ; + white_listed = false ; + meta = P.init_meta } in + known_peers := PeerMap.update point source !known_peers ; + LC.broadcast new_contact point) + peers ; + main () + | Shutdown -> + return () + in + (* blacklist filter *) + let rec unblock () = + pick [ (Lwt_unix.sleep 20. >>= fun _ -> return true) ; + (cancelation () >>= fun () -> return false) ] >>= fun continue -> + if continue then + let now = Unix.gettimeofday () in + black_list := BlackList.fold + (fun addr d map -> if d < now then map else BlackList.add addr d map) + !black_list BlackList.empty ; + known_peers := + PeerMap.fold (fun point gid source map -> + let source = + match source.unreachable_since with + | Some t when now -. t < 20. -> source + | _ -> { source with unreachable_since = None } in + PeerMap.update point ?gid source map) + !known_peers PeerMap.empty ; + unblock () + else return () + in + (* launch all workers *) + let welcome = worker (Format.asprintf "(%a) welcome" pp_gid my_gid) welcome cancel in + let maintenance = worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in + let main = worker (Format.asprintf "(%a) reception" pp_gid my_gid) main cancel in + let unblock = worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) unblock cancel in + let discovery_answerer = + let buf = MBytes.create 0x100_000 in + match config.discovery_port with + | Some disco_port -> + let answerer () = + discovery_answerer my_gid disco_port cancelation @@ fun addr port socket -> + (* do not reply to ourselves or conncted peers *) + if not (PeerMap.mem_by_point (addr, port) !connected) + && (try match PeerMap.gid_by_point (addr, port) !known_peers with + | Some gid -> not (PeerMap.mem_by_gid gid !connected) + && not (my_gid = gid) + | None -> true with Not_found -> true) then + (* either reply by a list of peer or connect if we need peers *) + if PeerMap.cardinal !connected >= limits.expected_connections then begin + enqueue_event (Peers [ addr, port ]) ; + send_msg socket buf (Advertise (bootstrap_peers ())) >>= fun _ -> + LU.close socket + end else begin + enqueue_event (Contact ((addr, port), socket)) ; + return () + end + else LU.close socket in + worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) answerer cancel + | _ -> return () in + let discovery_sender = + match config.incoming_port, config.discovery_port with + | Some inco_port, Some disco_port -> + let sender () = + discovery_sender my_gid disco_port inco_port cancelation restart_discovery in + worker (Format.asprintf "(%a) discovery sender" pp_gid my_gid) sender cancel + | _ -> return () in + (* net manipulation callbacks *) + let rec shutdown () = + debug "(%a) starting network shutdown" pp_gid my_gid ; + (* stop accepting clients *) + cancel () >>= fun () -> + (* wait for both workers to end *) + join [ welcome ; main ; maintenance ; unblock ; + discovery_answerer ; discovery_sender ] >>= fun () -> + (* properly shutdown all peers *) + let cancelers = + PeerMap.fold + (fun point _ peer res -> + (peer.disconnect () >>= fun () -> + connected := PeerMap.remove_by_point point !connected ; + return ()) :: res) + !connected @@ + PointMap.fold + (fun point canceler res -> + (canceler () >>= fun () -> + incoming := PointMap.remove point !incoming ; + return ()) :: res) + !incoming @@ [] + in + join cancelers >>= fun () -> + debug "(%a) network shutdown complete" pp_gid my_gid ; + return () + and peers () = + PeerMap.fold (fun _ _ peer r -> peer :: r) !connected [] + and find_peer gid = try Some (PeerMap.by_gid gid !connected) with Not_found -> None + and peer_info (peer : peer) = { + gid = peer.gid ; + addr = fst peer.point ; + port = snd peer.point ; + version = peer.version ; + } + and recv_from () = + dequeue_msg () + and send_to peer msg = + peer.send (Message msg) >>= fun _ -> return () + and try_send peer msg = + Lwt.async (fun () -> peer.send (Message msg)); true + and broadcast msg = + PeerMap.iter + (fun _ _ peer -> + Lwt.async (fun () -> peer.send (Message msg))) + !connected + and blacklist ?(duration = limits.blacklist_time) addr = + let t = Unix.gettimeofday () +. duration in + black_list := BlackList.add addr t !black_list ; + debug "(%a) address %a blacklisted" pp_gid my_gid Ipaddr.pp_hum addr ; + (* we ban this peer, but also all the ones at this address, even + when whitelisted (the blacklist operation wins) *) + known_peers := + PeerMap.fold + (fun ((a, _) as point) gid p map -> + if a = addr then map else PeerMap.update point ?gid p map) + !known_peers PeerMap.empty ; + (* we disconnect all peers at this address sur-le-champ *) + PeerMap.iter + (fun (a, _) _ p -> if addr = a then + Lwt.async (fun () -> p.disconnect ())) + !connected ; + (* and prevent incoming connections *) + PointMap.iter + (fun (a, _) cancel -> if a = addr then Lwt.async cancel) + !incoming + + and whitelist_point point = + let source, gid = try + { (PeerMap.by_point point !known_peers) + with white_listed = true }, + PeerMap.gid_by_point point !known_peers + with Not_found -> { unreachable_since = None ; connections = None ; - white_listed = true } in - List.fold_left - (fun r point -> PeerMap.update point source r) - PeerMap.empty config.known_peers in - let black_list = - BlackList.empty in - known_peers, black_list, my_gid in - match res with - | None -> - let known_peers, black_list, my_gid = init_peers () in - debug "(%a) peer cache initiated" pp_gid my_gid ; - ref known_peers, ref black_list, my_gid - | Some json -> - match Data_encoding.Json.destruct peers_file_encoding json with - | exception _ -> - let known_peers, black_list, my_gid = init_peers () in - debug "(%a) peer cache reset" pp_gid my_gid ; - ref known_peers, ref black_list, my_gid - | (my_gid, (k, b, w)) -> - let white_list = - List.fold_right PointSet.add w PointSet.empty in - let known_peers = - List.fold_left - (fun r (addr, port, infos) -> - match infos with - | None -> - let source = - { unreachable_since = None ; - connections = None ; - white_listed = true } in - PeerMap.update (addr, port) source r - | Some (c, t, gid) -> - let source = - { unreachable_since = None ; - connections = Some (c, t) ; - white_listed = PointSet.mem (addr, port) white_list } in - PeerMap.update (addr, port) ~gid source r) - PeerMap.empty k in - let black_list = - List.fold_left - (fun r (a, d) -> BlackList.add a d r) - BlackList.empty b in - debug "(%a) peer cache loaded" pp_gid my_gid ; - ref known_peers, ref black_list, my_gid - in - (* some peer reachability predicates *) - let black_listed (addr, _) = - BlackList.mem addr !black_list in - let white_listed point = - try (PeerMap.by_point point !known_peers).white_listed - with Not_found -> false in - let grey_listed point = - try match (PeerMap.by_point point !known_peers).unreachable_since with - | None -> false | Some t -> Unix.gettimeofday () -. t > 5. - with Not_found -> false in - (* save the cache at exit *) - on_cancel (fun () -> - (* save the known peers cache *) - let json = - Data_encoding.Json.construct peers_file_encoding @@ - (my_gid, - PeerMap.fold - (fun (addr, port) gid source (k, b, w) -> - let infos = match gid, source.connections with - | Some gid, Some (n, t) -> Some (n, t, gid) - | _ -> None in - ((addr, port, infos) :: k, - b, - if source.white_listed then (addr, port) :: w else w)) - !known_peers ([], BlackList.bindings !black_list, [])) - in - Data_encoding.Json.write_file config.peers_file json >>= fun _ -> - debug "(%a) peer cache saved" pp_gid my_gid ; - return ()) ; - (* storage of active and not yet active peers *) - let incoming = ref PointMap.empty in - let connected = ref PeerMap.empty in - (* peer welcoming (accept) loop *) - let welcome () = - match config.incoming_port with - | None -> (* no input port => no welcome worker *) return () - | Some port -> - (* open port for incoming connexions *) - let addr = Unix.inet6_addr_any in - catch - (fun () -> - let main_socket = LU.(socket PF_INET6 SOCK_STREAM 0) in - LU.(setsockopt main_socket SO_REUSEADDR true) ; - LU.(bind main_socket (ADDR_INET (addr, port))) ; - LU.listen main_socket limits.max_connections ; - return (Some main_socket)) - (fun exn -> - debug "(%a) cannot accept incoming peers (%s)" - pp_gid my_gid (string_of_unix_exn exn) ; - return None)>>= function - | None -> - (* FIXME: run in degraded mode, better exit ? *) - return () - | Some main_socket -> - (* then loop *) - let rec step () = - pick [ (LU.accept main_socket >>= fun (s, a) -> return (Some (s, a))) ; - (cancelation () >>= fun _ -> return None) ] >>= function - | None -> - LU.close main_socket - | Some (socket, addr) -> - match addr with - | LU.ADDR_INET (addr, port) -> - let addr = Ipaddr_unix.of_inet_addr addr in - enqueue_event (Contact ((addr, port), socket)) ; - step () - | _ -> - Lwt.async (fun () -> LU.close socket) ; - step () - in step () - in - (* input maintenance events *) - let too_many_peers = LC.create () in - let too_few_peers = LC.create () in - let new_peer = LC.create () in - let new_contact = LC.create () in - let please_maintain = LC.create () in - let restart_discovery = LC.create () in - (* output maintenance events *) - let just_maintained = LC.create () in - (* maintenance worker, returns when [connections] peers are connected *) - let rec maintenance () = - pick [ (LU.sleep 120. >>= fun () -> return true) ; (* every two minutes *) - (LC.wait please_maintain >>= fun () -> return true) ; (* when asked *) - (LC.wait too_few_peers >>= fun () -> return true) ; (* limits *) - (LC.wait too_many_peers >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - let rec maintain () = - let n_connected = PeerMap.cardinal !connected in - if n_connected >= limits.expected_connections - && n_connected <= limits.max_connections then - (* end of maintenance when enough users have been reached *) - (LC.broadcast just_maintained () ; - debug "(%a) maintenance step ended" - pp_gid my_gid ; - maintenance ()) - else if n_connected < limits.expected_connections then - (* too few peers, try and contact many peers *) - let contact nb = - let contactable = - (* we sort sources by level (prefered first) *) - PeerMap.bindings !known_peers |> - List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> - (* remove the ones we're connect(ed/ing) to and the blacklisted *) - List.filter (fun (point, gid, source) -> - (not (black_listed point) || source.white_listed) - && not (grey_listed point) - && not (gid = Some my_gid) - && not (PeerMap.mem_by_point point !connected) - && not (PointMap.mem point !incoming) - && match gid with | None -> true | Some gid -> - not (PeerMap.mem_by_gid gid !connected)) in - let rec do_contact_loop strec = - match strec with - | 0, _ -> return true - | _, [] -> return false (* we didn't manage to contact enough peers *) - | nb, ((addr, port), gid, source) :: tl -> - (* we try to open a connection *) - let socket = LU.(socket (match addr with Ipaddr.V4 _ -> PF_INET | V6 _ -> PF_INET6) SOCK_STREAM 0) in - let uaddr = Ipaddr_unix.to_inet_addr addr in - catch - (fun () -> - lwt_debug "Trying to connect to %a:%d" - Ipaddr.pp_hum addr port >>= fun () -> - Lwt.pick - [ (Lwt_unix.sleep 2.0 >>= fun _ -> Lwt.fail Not_found) ; - LU.connect socket (LU.ADDR_INET (uaddr, port)) - ] >>= fun () -> - lwt_debug "Connected to %a:%d" - Ipaddr.pp_hum addr port >>= fun () -> - enqueue_event (Contact ((addr, port), socket)) ; - return (nb - 1)) - (fun exn -> - lwt_debug "Connection failed to %a:%d (%s)" - Ipaddr.pp_hum addr port - (string_of_unix_exn exn) >>= fun () -> - (* if we didn't succes, we greylist it *) - let now = Unix.gettimeofday () in - known_peers := - PeerMap.update (addr, port) ?gid - { source with unreachable_since = Some now } - !known_peers ; - LU.close socket >>= fun () -> - return nb) >>= fun nrec -> - do_contact_loop (nrec, tl) - in do_contact_loop (nb, contactable) - in - let to_contact = limits.max_connections - n_connected in - debug "(%a) too few connections (%d)" pp_gid my_gid n_connected ; - contact to_contact >>= function - | true -> (* enough contacts, now wait for connections *) - pick [ (LC.wait new_peer >>= fun _ -> return true) ; - (LU.sleep 1.0 >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then maintain () else return () - | false -> (* not enough contacts, ask the pals of our pals, - discover the local network and then wait *) - LC.broadcast restart_discovery () ; - (PeerMap.iter - (fun _ _ peer -> Lwt.async (fun () -> peer.send Bootstrap)) - !connected ; - pick [ (LC.wait new_peer >>= fun _ -> return true) ; - (LC.wait new_contact >>= fun _ -> return true) ; - (LU.sleep 1.0 >>= fun () -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then maintain () else return ()) - else - (* too many peers, start the russian roulette *) - let to_kill = n_connected - limits.max_connections in - debug "(%a) too many connections, will kill %d" pp_gid my_gid to_kill ; - snd (PeerMap.fold - (fun _ _ peer (i, t) -> - if i = 0 then (0, t) - else (i - 1, t >>= fun () -> peer.disconnect ())) - !connected (to_kill, return ())) >>= fun () -> - (* and directly skip to the next maintenance request *) - LC.broadcast just_maintained () ; - debug "(%a) maintenance step ended" pp_gid my_gid ; - maintenance () + white_listed = true ; + meta = P.init_meta }, + None in + known_peers := PeerMap.update point ?gid source !known_peers + and whitelist peer = + (* we promote this peer to the white list, if reachable *) + match peer.listening_port with + | Some port -> + let point = fst peer.point, port in + whitelist_point point + | None -> () + + and maintain () = + let waiter = LC.wait just_maintained in + LC.broadcast please_maintain () ; + waiter + and roll () = Pervasives.failwith "roll" + and get_meta _gid = None (* TODO: implement *) + and set_meta _gid _meta = () (* TODO: implement *) in - if continue then maintain () else return () - in - (* select the peers to send on a bootstrap request *) - let bootstrap_peers () = - (* we sort peers by desirability *) - PeerMap.bindings !known_peers |> - List.filter (fun ((ip,_),_,_) -> not (Ipaddr.is_private ip)) |> - List.sort (fun (_, _, s1) (_, _, s2) -> compare_sources s1 s2) |> - (* HERE *) - (* we simply send the first 50 (or less) known peers *) - List.fold_left - (fun (n, l) (point, _, _) -> if n = 0 then (n, l) else (n - 1, point :: l)) - (50, []) |> snd - in - (* main internal event handling worker *) - let rec main () = - pick [ dequeue_event () ; - cancelation () >>= fun () -> return Shutdown ] >>= fun event -> - match event with - | Disconnected peer -> - debug "(%a) disconnected peer %a" pp_gid my_gid pp_gid peer.gid ; - (* remove it from the tables *) - connected := PeerMap.remove_by_point peer.point !connected ; - if PeerMap.cardinal !connected < limits.min_connections then - LC.broadcast too_few_peers () ; - incoming := PointMap.remove peer.point !incoming ; - main () - | Connected peer -> - incoming := PointMap.remove peer.point !incoming ; - let update_infos () = - (* we update our knowledge table according to the - reachable address given by the peer *) - match peer.listening_port with - | None -> () - | Some port -> - let point = (fst peer.point, port) in - let update source = - (* delete previous infos about this address / gid *) - known_peers := PeerMap.remove_by_point point !known_peers ; - known_peers := PeerMap.remove_by_gid peer.gid !known_peers ; - (* then assign *) - known_peers := PeerMap.update point ~gid:peer.gid source !known_peers - in update @@ - try match PeerMap.by_gid peer.gid !known_peers with - | { connections = None ; white_listed } -> - { connections = Some (1, Unix.gettimeofday ()) ; - unreachable_since = None ; - white_listed } - | { connections = Some (n, _) ; white_listed } -> - { connections = Some (n + 1, Unix.gettimeofday ()) ; - unreachable_since = None ; - white_listed} - with Not_found -> - { connections = Some (1, Unix.gettimeofday ()) ; - unreachable_since = None ; - white_listed = white_listed point } - in - (* if it's me, it's probably not me *) - if my_gid = peer.gid then begin - debug "(%a) rejected myself from %a:%d" - pp_gid my_gid Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; - (* now that I know my address, I can save this info to - prevent future reconnections to myself *) - update_infos () ; - Lwt.async peer.disconnect - end - (* keep only one connection to each node by checking its gid *) - else if PeerMap.mem_by_gid peer.gid !connected then begin - debug "(%a) rejected already connected peer %a @ %a:%d" - pp_gid my_gid pp_gid peer.gid - Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; - update_infos () ; - Lwt.async peer.disconnect - end else begin - debug "(%a) connected peer %a @ %a:%d" - pp_gid my_gid pp_gid peer.gid - Ipaddr.pp_hum (fst peer.point) (snd peer.point) ; - update_infos () ; - connected := - PeerMap.update peer.point ~gid:peer.gid peer !connected ; - if PeerMap.cardinal !connected > limits.max_connections then - LC.broadcast too_many_peers () ; - LC.broadcast new_peer peer - end ; - main () - | Contact ((addr, port), socket) -> - (* we do not check the credentials at this stage, since they - could change from one connection to the next *) - if PointMap.mem (addr, port) !incoming - || PeerMap.mem_by_point (addr, port) !connected - || BlackList.mem addr !black_list then - LU.close socket >>= fun () -> - main () - else - let canceler = - connect_to_peer config limits my_gid socket (addr, port) enqueue_event white_listed in - debug "(%a) incoming peer at %a:%d" - pp_gid my_gid Ipaddr.pp_hum addr port ; - incoming := PointMap.add (addr, port) canceler !incoming ; - main () - | Bootstrap peer -> - let sample = bootstrap_peers () in - Lwt.async (fun () -> peer.send (Advertise sample)) ; - main () - | Recv (peer, message) -> - enqueue_msg (peer, message) ; - main () - | Peers peers -> - List.iter - (fun point -> - if not (PeerMap.mem_by_point point !known_peers) then - let source = - { unreachable_since = None ; - connections = None ; - white_listed = false } in - known_peers := PeerMap.update point source !known_peers ; - LC.broadcast new_contact point) - peers ; - main () - | Shutdown -> - return () - in - (* blacklist filter *) - let rec unblock () = - pick [ (Lwt_unix.sleep 20. >>= fun _ -> return true) ; - (cancelation () >>= fun () -> return false) ] >>= fun continue -> - if continue then - let now = Unix.gettimeofday () in - black_list := BlackList.fold - (fun addr d map -> if d < now then map else BlackList.add addr d map) - !black_list BlackList.empty ; - known_peers := - PeerMap.fold (fun point gid source map -> - let source = - match source.unreachable_since with - | Some t when now -. t < 20. -> source - | _ -> { source with unreachable_since = None } in - PeerMap.update point ?gid source map) - !known_peers PeerMap.empty ; - unblock () - else return () - in - (* launch all workers *) - let welcome = worker (Format.asprintf "(%a) welcome" pp_gid my_gid) welcome cancel in - let maintenance = worker (Format.asprintf "(%a) maintenance" pp_gid my_gid) maintenance cancel in - let main = worker (Format.asprintf "(%a) reception" pp_gid my_gid) main cancel in - let unblock = worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) unblock cancel in - let discovery_answerer = - match config.discovery_port with - | Some disco_port -> - let answerer () = - discovery_answerer my_gid disco_port cancelation @@ fun addr port socket -> - (* do not reply to ourselves or conncted peers *) - if not (PeerMap.mem_by_point (addr, port) !connected) - && (try match PeerMap.gid_by_point (addr, port) !known_peers with - | Some gid -> not (PeerMap.mem_by_gid gid !connected) - && not (my_gid = gid) - | None -> true with Not_found -> true) then - (* either reply by a list of peer or connect if we need peers *) - if PeerMap.cardinal !connected >= limits.expected_connections then begin - enqueue_event (Peers [ addr, port ]) ; - send_packet socket (Advertise (bootstrap_peers ())) >>= fun _ -> - LU.close socket - end else begin - enqueue_event (Contact ((addr, port), socket)) ; - return () - end - else LU.close socket in - worker (Format.asprintf "(%a) discovery answerer" pp_gid my_gid) answerer cancel - | _ -> return () in - let discovery_sender = - match config.incoming_port, config.discovery_port with - | Some inco_port, Some disco_port -> - let sender () = - discovery_sender my_gid disco_port inco_port cancelation restart_discovery in - worker (Format.asprintf "(%a) discovery sender" pp_gid my_gid) sender cancel - | _ -> return () in - (* net manipulation callbacks *) - let rec shutdown () = - debug "(%a) starting network shutdown" pp_gid my_gid ; - (* stop accepting clients *) - cancel () >>= fun () -> - (* wait for both workers to end *) - join [ welcome ; main ; maintenance ; unblock ; - discovery_answerer ; discovery_sender ] >>= fun () -> - (* properly shutdown all peers *) - let cancelers = - PeerMap.fold - (fun point _ peer res -> - (peer.disconnect () >>= fun () -> - connected := PeerMap.remove_by_point point !connected ; - return ()) :: res) - !connected @@ - PointMap.fold - (fun point canceler res -> - (canceler () >>= fun () -> - incoming := PointMap.remove point !incoming ; - return ()) :: res) - !incoming @@ [] - in - join cancelers >>= fun () -> - debug "(%a) network shutdown complete" pp_gid my_gid ; - return () - and peers () = - PeerMap.fold (fun _ _ peer r -> peer :: r) !connected [] - and peer_info peer = - fst peer.point, snd peer.point, peer.version - and recv_from () = - dequeue_msg () - and send_to (peer, msg) = - peer.send (Message msg) >>= fun _ -> return () - and push (peer, msg) = - Lwt.async (fun () -> peer.send (Message msg)) - and broadcast msg = - PeerMap.iter - (fun _ _ peer -> - Lwt.async (fun () -> peer.send (Message msg))) - !connected - and blacklist ?(duration = limits.blacklist_time) addr = - let t = Unix.gettimeofday () +. duration in - black_list := BlackList.add addr t !black_list ; - debug "(%a) address %a blacklisted" pp_gid my_gid Ipaddr.pp_hum addr ; - (* we ban this peer, but also all the ones at this address, even - when whitelisted (the blacklist operation wins) *) - known_peers := - PeerMap.fold - (fun ((a, _) as point) gid p map -> - if a = addr then map else PeerMap.update point ?gid p map) - !known_peers PeerMap.empty ; - (* we disconnect all peers at this address sur-le-champ *) - PeerMap.iter - (fun (a, _) _ p -> if addr = a then - Lwt.async (fun () -> p.disconnect ())) - !connected ; - (* and prevent incoming connections *) - PointMap.iter - (fun (a, _) cancel -> if a = addr then Lwt.async cancel) - !incoming + let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; + blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta } in + (* main thread, returns after first successful maintenance *) + maintain () >>= fun () -> + debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; + return net - and whitelist_point point = - let source, gid = try - { (PeerMap.by_point point !known_peers) - with white_listed = true }, - PeerMap.gid_by_point point !known_peers - with Not_found -> - { unreachable_since = None ; - connections = None ; - white_listed = true }, - None in - known_peers := PeerMap.update point ?gid source !known_peers - and whitelist peer = - (* we promote this peer to the white list, if reachable *) - match peer.listening_port with - | Some port -> - let point = fst peer.point, port in - whitelist_point point - | None -> () - - and maintain () = - let waiter = LC.wait just_maintained in - LC.broadcast please_maintain () ; - waiter - and roll () = Pervasives.failwith "roll" - in - let net = { shutdown ; peers ; recv_from ; send_to ; push ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info } in - (* main thread, returns after first successful maintenance *) - maintain () >>= fun () -> - debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; - return net - -let faked_network = - let infinity, wakeup = Lwt.wait () in - let shutdown () = - Lwt.wakeup_exn wakeup Lwt_stream.Empty; - Lwt.return_unit in - let peers () = [] in - let recv_from () = infinity in - let send_to _ = Lwt.return_unit in - let push _ = () in - let broadcast _ = () in - let blacklist ?duration _ = ignore duration ; () in - let whitelist _ = () in - let maintain () = Lwt.return_unit in - let roll () = Lwt.return_unit in - let peer_info _ = assert false in - { shutdown ; peers ; recv_from ; send_to ; push ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info } + let faked_network = + let infinity, wakeup = Lwt.wait () in + let shutdown () = + Lwt.wakeup_exn wakeup Lwt_stream.Empty; + Lwt.return_unit in + let peers () = [] in + let find_peer _ = None in + let recv_from () = infinity in + let send_to _ _ = Lwt.return_unit in + let try_send _ _ = true in + let broadcast _ = () in + let blacklist ?duration _ = ignore duration ; () in + let whitelist _ = () in + let maintain () = Lwt.return_unit in + let roll () = Lwt.return_unit in + let peer_info _ = assert false in + let get_meta _ = None in + let set_meta _ _ = () in + { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; + blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta } -(* Plug toplevel functions to callback calls. *) -let shutdown net = net.shutdown () -let peers net = net.peers () -let peer_info peer net = net.peer_info peer -let recv net = net.recv_from () -let send (peer, msg) net = net.send_to (peer, msg) -let push peer net = net.push peer -let broadcast msg net = net.broadcast msg -let maintain net = net.maintain () -let roll net = net.roll () -let blacklist ?duration peer net = net.blacklist ?duration peer -let whitelist peer net = net.whitelist peer + (* Plug toplevel functions to callback calls. *) + let shutdown net = net.shutdown () + let peers net = net.peers () + let find_peer net gid = net.find_peer gid + let peer_info net peer = net.peer_info peer + let recv net = net.recv_from () + let send net peer msg = net.send_to peer msg + let try_send net peer = net.try_send peer + let broadcast net msg = net.broadcast msg + let maintain net = net.maintain () + let roll net = net.roll () + let blacklist _net _gid = () + let whitelist _net _gid = () + let get_meta net gid = net.get_meta gid + let set_meta net gid meta = net.set_meta gid meta +end + diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index fcebfaafd..5d517a084 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -7,35 +7,27 @@ (* *) (**************************************************************************) -(** A P2P network *) -type net - -(** A faked p2p layer, which do not initiate any connection - nor open any listening socket. *) -val faked_network : net - (** A peer connection address *) type addr = Ipaddr.t (** A peer connection port *) type port = int -(** A protocol version tag: (name, major, minor) *) -type version = string * int * int +(** A p2p protocol version *) +type version = { + name : string ; + major : int ; + minor : int ; +} (** Network configuration *) type config = { (** Tells if incoming connections accepted, precising the TCP port - on which the peer can be reached *) + on which the peer can be reached *) incoming_port : port option ; (** Tells if peers should be discovered automatically on the local - network, precising the UDP port to use *) + network, precising the UDP port to use *) discovery_port : port option ; - (** High level protocol(s) talked by the peer. When two peers - initiate a connection, they exchange their list of supported - versions. The chosen one, if any, is the maximum common one (in - lexicographic order) *) - supported_versions : version list ; (** List of hard-coded known peers to bootstrap the network from *) known_peers : (addr * port) list ; (** The path to the JSON file where the peer cache is loaded / stored *) @@ -47,7 +39,7 @@ type config = { (** Network capacities *) type limits = { - (** Maximum length in bytes of network frames *) + (** Maximum length in bytes of network messages' payload *) max_packet_size : int ; (** Delay after which a non responding peer is considered dead *) peer_answer_timeout : float ; @@ -61,42 +53,94 @@ type limits = { blacklist_time : float ; } -(** Main network initialisation function *) -val bootstrap : config -> limits -> net Lwt.t +type 'msg msg_encoding = Encoding : { + tag: int ; + encoding: 'a Data_encoding.t ; + wrap: 'a -> 'msg ; + unwrap: 'msg -> 'a option ; + max_length: int option ; + } -> 'msg msg_encoding -(** A maintenance operation : try and reach the ideal number of peers *) -val maintain : net -> unit Lwt.t +module type NET_PARAMS = sig + type meta (** Type of metadata associated to an identity *) + type msg (** Type of message used by higher layers *) -(** Voluntarily drop some peers and replace them by new buddies *) -val roll : net -> unit Lwt.t + val msg_encodings : msg msg_encoding list -(** Close all connections properly *) -val shutdown : net -> unit Lwt.t + val init_meta : meta + val score_enc : meta Data_encoding.t + val score: meta -> float -(** A connection to a peer *) -type peer + (** High level protocol(s) talked by the peer. When two peers + initiate a connection, they exchange their list of supported + versions. The chosen one, if any, is the maximum common one (in + lexicographic order) *) + val supported_versions : version list +end -(** Access the domain of active peers *) -val peers : net -> peer list +module Make (P : NET_PARAMS) : sig + type net -(** Access the info of an active peer, if available *) -val peer_info : peer -> net -> addr * port * version + (** A faked p2p layer, which do not initiate any connection + nor open any listening socket. *) + val faked_network : net -(** Wait for a Netbits.frame from any peer in the network *) -val recv : net -> (peer * Netbits.frame) Lwt.t + (** Main network initialisation function *) + val bootstrap : config:config -> limits:limits -> net Lwt.t -(** Send a Netbits.frame to a peer and wait for it to be in the tube *) -val send : peer * Netbits.frame -> net -> unit Lwt.t + (** A maintenance operation : try and reach the ideal number of peers *) + val maintain : net -> unit Lwt.t -(** Send a Netbits.frame to a peer asynchronously *) -val push : peer * Netbits.frame -> net -> unit + (** Voluntarily drop some peers and replace them by new buddies *) + val roll : net -> unit Lwt.t -(** Send a Netbits.frame to all peers *) -val broadcast : Netbits.frame -> net -> unit + (** Close all connections properly *) + val shutdown : net -> unit Lwt.t -(** Shutdown the connection to all peers at this address and stop the - communications with this machine for [duration] seconds *) -val blacklist : ?duration:float -> addr -> net -> unit + (** A connection to a peer *) + type peer -(** Keep a connection to this pair as often as possible *) -val whitelist : peer -> net -> unit + (** A global identifier for a peer, a.k.a. an identity *) + type gid + + (** Access the domain of active peers *) + val peers : net -> peer list + + (** Return the active peer with identity [gid] *) + val find_peer : net -> gid -> peer option + + type peer_info = { + gid : gid; + addr : addr; + port : port; + version : version; + } + + (** Access the info of an active peer, if available *) + val peer_info : net -> peer -> peer_info + + (** Accessors for meta information about a peer *) + val get_meta : net -> gid -> P.meta option + val set_meta : net -> gid -> P.meta -> unit + + (** Wait for a payload from any peer in the network *) + val recv : net -> (peer * P.msg) Lwt.t + + (** Send a payload to a peer and wait for it to be in the tube *) + val send : net -> peer -> P.msg -> unit Lwt.t + + (** Send a payload to a peer without waiting for the result. Return + [true] if the message can be enqueued in the peer's output queue + or [false] otherwise. *) + val try_send : net -> peer -> P.msg -> bool + + (** Send a payload to all peers *) + val broadcast : net -> P.msg -> unit + + (** Shutdown the connection to all peers at this address and stop the + communications with this machine for [duration] seconds *) + val blacklist : net -> gid -> unit + + (** Keep a connection to this pair as often as possible *) + val whitelist : net -> gid -> unit +end diff --git a/src/node/shell/discoverer.ml b/src/node/shell/discoverer.ml index a210b6508..86af9ca0a 100644 --- a/src/node/shell/discoverer.ml +++ b/src/node/shell/discoverer.ml @@ -7,6 +7,8 @@ (* *) (**************************************************************************) +module P2p = Netparams + type worker = { shutdown: unit -> unit Lwt.t; } @@ -15,7 +17,7 @@ let create_worker p2p state = let cancelation, cancel, _on_cancel = Lwt_utils.canceler () in - let broadcast m = P2p.broadcast (Messages.to_frame m) p2p in + let broadcast m = P2p.broadcast p2p m in let discovery_worker = let rec worker_loop () = diff --git a/src/node/shell/discoverer.mli b/src/node/shell/discoverer.mli index fbb75e336..35e11b81e 100644 --- a/src/node/shell/discoverer.mli +++ b/src/node/shell/discoverer.mli @@ -9,6 +9,6 @@ type worker -val create_worker: P2p.net -> State.t -> worker +val create_worker: Netparams.net -> State.t -> worker val shutdown: worker -> unit Lwt.t diff --git a/src/node/shell/messages.ml b/src/node/shell/messages.ml index 22e32f68c..0c3904663 100644 --- a/src/node/shell/messages.ml +++ b/src/node/shell/messages.ml @@ -7,11 +7,9 @@ (* *) (**************************************************************************) -open Netbits - type net_id = Store.net_id -type message = +type t = | Discover_blocks of net_id * Block_hash.t list (* Block locator *) | Block_inventory of net_id * Block_hash.t list @@ -28,65 +26,53 @@ type message = | Get_protocols of Protocol_hash.t list | Protocol of MBytes.t +let encoding = + let open Data_encoding in + let case ?max_length ~tag encoding unwrap wrap = + P2p.Encoding { tag; encoding; wrap; unwrap; max_length } + in [ + case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding)) + (function + | Discover_blocks (Net genesis_bh, bhs) -> Some (genesis_bh, bhs) + | _ -> None) + (fun (genesis_bh, bhs) -> Discover_blocks (Net genesis_bh, bhs)); + case ~tag:0x11 (tup2 Block_hash.encoding (list Block_hash.encoding)) + (function + | Block_inventory (Net genesis_bh, bhs) -> Some (genesis_bh, bhs) + | _ -> None) + (fun (genesis_bh, bhs) -> Block_inventory (Net genesis_bh, bhs)); -let to_frame msg = + case ~tag:0x12 (list Block_hash.encoding) + (function + | Get_blocks bhs -> Some bhs + | _ -> None) + (fun bhs -> Get_blocks bhs); + case ~tag:0x13 Data_encoding.bytes + (function Block b -> Some b | _ -> None) + (fun b -> Block b); - let bh h = B (Block_hash.to_bytes h) in - let oph h = B (Operation_hash.to_bytes h) in - let ph h = B (Protocol_hash.to_bytes h) in - match msg with + case ~tag:0x20 Block_hash.encoding + (function Current_operations (Net genesis_bh) -> Some genesis_bh | _ -> None) + (fun genesis_bh -> Current_operations (Net genesis_bh)); + case ~tag:0x21 (tup2 Block_hash.encoding (list Operation_hash.encoding)) + (function Operation_inventory ((Net genesis_bh), ops) -> Some (genesis_bh, ops) | _ -> None) + (fun (genesis_bh, ops) -> Operation_inventory (Net genesis_bh, ops)); - | Discover_blocks (Net netid, blocks) -> - [ S 2100 ; bh netid ; F (List.map bh blocks) ] - | Block_inventory (Net netid, blocks) -> - [ S 2101 ; bh netid ; F (List.map bh blocks) ] - | Get_blocks blocks -> - [ S 2102 ; F (List.map bh blocks) ] - | Block b -> - [ S 2103 ; B b ] - - | Current_operations (Net net_id) -> - [ S 2700 ; bh net_id ] - | Operation_inventory (Net net_id, ops) -> - [ S 2701 ; bh net_id ; F (List.map oph ops) ] - | Get_operations ops -> - [ S 2702 ; F (List.map oph ops) ] - | Operation b -> - [ S 2703 ; B b ] - - | Get_protocols protos -> - [ S 2800 ; F (List.map ph protos) ] - | Protocol p -> - [ S 2801 ; B p ] - -let from_frame msg = - - let bh = function B s -> (Block_hash.of_bytes s) | _ -> invalid_arg "bh" in - let oph = function B s -> (Operation_hash.of_bytes s) | _ -> invalid_arg "oph" in - let ph = function B s -> (Protocol_hash.of_bytes s) | _ -> invalid_arg "ph" in - let net = function netid -> Store.Net (Block_hash.of_bytes netid) in - try match msg with - - | [ S 2100 ; B netid ; F blocks ] -> - Some (Discover_blocks (net netid, List.map bh blocks)) - | [ S 2101 ; B netid ; F blocks ] -> - Some (Block_inventory (net netid, List.map bh blocks)) - | [ S 2102 ; F blocks ] -> - Some (Get_blocks (List.map bh blocks)) - | [ S 2103 ; B bh ] -> Some (Block bh) - | [ S 2700 ; B netid ] -> - Some (Current_operations (net netid)) - | [ S 2701 ; B netid ; F ops ] -> - Some (Operation_inventory (net netid, List.map oph ops)) - | [ S 2702 ; F ops ] -> - Some (Get_operations (List.map oph ops)) - | [ S 2703 ; B contents ] -> Some (Operation contents) - - | [ S 2800 ; F protos ] -> Some (Get_protocols (List.map ph protos)) - - | [ S 2801 ; B contents ] -> Some (Protocol contents) - - | _ -> None - - with Invalid_argument _ -> None + case ~tag:0x22 (list Operation_hash.encoding) + (function + | Get_operations ops -> Some ops + | _ -> None) + (fun ops -> Get_operations ops); + case ~tag:0x23 Data_encoding.bytes + (function Operation o -> Some o | _ -> None) + (fun o -> Operation o); + case ~tag:0x32 (list Protocol_hash.encoding) + (function + | Get_protocols protos -> Some protos + | _ -> None) + (fun protos -> Get_protocols protos); + case ~tag:0x33 Data_encoding.bytes + (function Protocol proto -> Some proto | _ -> None) + (fun proto -> Protocol proto); +] diff --git a/src/node/shell/messages.mli b/src/node/shell/messages.mli index 2c3df42cc..b4e976539 100644 --- a/src/node/shell/messages.mli +++ b/src/node/shell/messages.mli @@ -8,7 +8,7 @@ (**************************************************************************) (** High level messages *) -type message = +type t = | Discover_blocks of Store.net_id * Block_hash.t list (* Block locator *) | Block_inventory of Store.net_id * Block_hash.t list @@ -25,9 +25,4 @@ type message = | Get_protocols of Protocol_hash.t list | Protocol of MBytes.t - -(** Converts a high level message to a network frame *) -val to_frame: message -> Netbits.frame - -(** Tries and convert a network frame to a high level message *) -val from_frame: Netbits.frame -> message option +val encoding : t P2p.msg_encoding list diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index 49872995c..e43806b69 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -7,12 +7,12 @@ (* *) (**************************************************************************) +module P2p = Netparams + open Logging.Node.Worker let (>|=) = Lwt.(>|=) -let supported_versions = ["TEZOS", 0, 0] - let inject_operation validator ?force bytes = let t = match Store.Operation.of_bytes bytes with @@ -194,18 +194,17 @@ type t = { let request_operations net _net_id operations = (* TODO improve the lookup strategy. For now simply broadcast the request to all our neighbours. *) - P2p.broadcast - (Messages.(to_frame (Get_operations operations))) net + P2p.broadcast net (Get_operations operations) let request_blocks net _net_id blocks = (* TODO improve the lookup strategy. For now simply broadcast the request to all our neighbours. *) - P2p.broadcast (Messages.(to_frame (Get_blocks blocks))) net + P2p.broadcast net (Get_blocks blocks) let request_protocols net protocols = (* TODO improve the lookup strategy. For now simply broadcast the request to all our neighbours. *) - P2p.broadcast (Messages.(to_frame (Get_protocols protocols))) net + P2p.broadcast net (Get_protocols protocols) let init_p2p net_params = match net_params with @@ -244,21 +243,12 @@ let create lwt_log_info "starting worker..." >>= fun () -> let worker = - let handle_msg peer frame = - lwt_log_info "received message" >>= fun () -> - match Messages.from_frame frame with - | None -> - lwt_warn "can't parse message" >>= fun () -> - (* FIXME 60 second ? parameter... and Log_notice *) - let addr, _, _ = P2p.peer_info peer p2p in - P2p.blacklist ~duration:60. addr p2p ; - Lwt.return_unit - | Some msg -> - process state validator msg >>= fun msgs -> - List.iter - (fun msg -> P2p.push (peer, Messages.to_frame msg) p2p) - msgs; - Lwt.return_unit + let handle_msg peer msg = + process state validator msg >>= fun msgs -> + List.iter + (fun msg -> ignore @@ P2p.try_send p2p peer msg) + msgs; + Lwt.return_unit in let rec worker_loop () = P2p.recv p2p >>= fun (peer, msg) -> diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index d5368fa7c..c771de089 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -9,8 +9,6 @@ type t -val supported_versions: P2p.version list - val create: genesis:Store.genesis -> store_root:string -> diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index 25a3b1734..ebf417302 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -7,6 +7,8 @@ (* *) (**************************************************************************) +module P2p = Netparams + open Logging.Node.Prevalidator let preapply @@ -95,9 +97,7 @@ let create p2p net = Lwt.return_unit in let broadcast_operation ops = - P2p.broadcast - Messages.(to_frame @@ Operation_inventory (State.Net.id net, ops)) - p2p in + P2p.broadcast p2p (Operation_inventory (State.Net.id net, ops)) in let handle_unprocessed () = if Operation_hash_set.is_empty !unprocessed then diff --git a/src/node/shell/prevalidator.mli b/src/node/shell/prevalidator.mli index 7b5b3d78f..d9194eb7d 100644 --- a/src/node/shell/prevalidator.mli +++ b/src/node/shell/prevalidator.mli @@ -26,6 +26,8 @@ *) +module P2p = Netparams + type t (** Creation and destruction of a "prevalidation" worker. *) diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 7ce9373cf..b330a302a 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -7,6 +7,8 @@ (* *) (**************************************************************************) +module P2p = Netparams + open Logging.Node.Validator type worker = { @@ -43,7 +45,7 @@ let test_validator w = w.test_validator () let fetch_block v = v.fetch_block let prevalidator v = v.prevalidator -let broadcast w m = P2p.broadcast (Messages.to_frame m) w.p2p +let broadcast w m = P2p.broadcast w.p2p m (** Current block computation *) diff --git a/src/node/shell/validator.mli b/src/node/shell/validator.mli index 2a43bc734..ca45a78e0 100644 --- a/src/node/shell/validator.mli +++ b/src/node/shell/validator.mli @@ -9,6 +9,8 @@ type worker +module P2p = Netparams + val create_worker: P2p.net -> State.t -> worker val shutdown: worker -> unit Lwt.t diff --git a/src/node_main.ml b/src/node_main.ml index f1b63796b..8c52837d4 100644 --- a/src/node_main.ml +++ b/src/node_main.ml @@ -288,7 +288,6 @@ let init_node () = { incoming_port = Globals.incoming_port#get ; discovery_port = if Globals.discovery_port#get then Some 7732 else None ; - supported_versions = Node.supported_versions ; known_peers = Globals.bootstrap_peers#get ; peers_file = Globals.peers_file#get ; closed_network = Globals.closed_network#get }