Shell/P2p: minor renaming.
This commit is contained in:
parent
cbfab86f25
commit
6afcc1ecdd
@ -7,6 +7,12 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
module LU = Lwt_unix
|
||||
module LC = Lwt_condition
|
||||
open Lwt
|
||||
open Lwt_utils
|
||||
open Logging.Net
|
||||
|
||||
(* public types *)
|
||||
type addr = Ipaddr.t
|
||||
type port = int
|
||||
@ -47,110 +53,12 @@ type gid = string
|
||||
|
||||
let gid_length = 16
|
||||
|
||||
type 'msg msg_encoding = Encoding : {
|
||||
tag: int ;
|
||||
encoding: 'a Data_encoding.t ;
|
||||
wrap: 'a -> 'msg ;
|
||||
unwrap: 'msg -> 'a option ;
|
||||
max_length: int option ;
|
||||
} -> 'msg msg_encoding
|
||||
|
||||
module type NET_PARAMS = sig
|
||||
type meta (** Type of metadata associated to an identity *)
|
||||
type msg (** Type of message used by higher layers *)
|
||||
|
||||
val msg_encodings : msg msg_encoding list
|
||||
|
||||
val init_meta : meta
|
||||
val score_enc : meta Data_encoding.t
|
||||
val score: meta -> float
|
||||
|
||||
(** High level protocol(s) talked by the peer. When two peers
|
||||
initiate a connection, they exchange their list of supported
|
||||
versions. The chosen one, if any, is the maximum common one (in
|
||||
lexicographic order) *)
|
||||
val supported_versions : version list
|
||||
end
|
||||
|
||||
module type S = sig
|
||||
include NET_PARAMS
|
||||
|
||||
type net
|
||||
|
||||
(** A faked p2p layer, which do not initiate any connection
|
||||
nor open any listening socket. *)
|
||||
val faked_network : net
|
||||
|
||||
(** Main network initialisation function *)
|
||||
val bootstrap : config:config -> limits:limits -> net Lwt.t
|
||||
|
||||
(** A maintenance operation : try and reach the ideal number of peers *)
|
||||
val maintain : net -> unit Lwt.t
|
||||
|
||||
(** Voluntarily drop some peers and replace them by new buddies *)
|
||||
val roll : net -> unit Lwt.t
|
||||
|
||||
(** Close all connections properly *)
|
||||
val shutdown : net -> unit Lwt.t
|
||||
|
||||
(** A connection to a peer *)
|
||||
type peer
|
||||
|
||||
(** Access the domain of active peers *)
|
||||
val peers : net -> peer list
|
||||
|
||||
(** Return the active peer with identity [gid] *)
|
||||
val find_peer : net -> gid -> peer option
|
||||
|
||||
type peer_info = {
|
||||
gid : gid;
|
||||
addr : addr;
|
||||
port : port;
|
||||
version : version;
|
||||
}
|
||||
|
||||
(** Access the info of an active peer, if available *)
|
||||
val peer_info : net -> peer -> peer_info
|
||||
|
||||
(** Accessors for meta information about a peer *)
|
||||
val get_meta : net -> gid -> meta option
|
||||
val set_meta : net -> gid -> meta -> unit
|
||||
|
||||
(** Wait for a payload from any peer in the network *)
|
||||
val recv : net -> (peer * msg) Lwt.t
|
||||
|
||||
(** Send a payload to a peer and wait for it to be in the tube *)
|
||||
val send : net -> peer -> msg -> unit Lwt.t
|
||||
|
||||
(** Send a payload to a peer without waiting for the result. Return
|
||||
[true] if the message can be enqueued in the peer's output queue
|
||||
or [false] otherwise. *)
|
||||
val try_send : net -> peer -> msg -> bool
|
||||
|
||||
(** Send a payload to all peers *)
|
||||
val broadcast : net -> msg -> unit
|
||||
|
||||
(** Shutdown the connection to all peers at this address and stop the
|
||||
communications with this machine for [duration] seconds *)
|
||||
val blacklist : net -> gid -> unit
|
||||
|
||||
(** Keep a connection to this pair as often as possible *)
|
||||
val whitelist : net -> gid -> unit
|
||||
end
|
||||
|
||||
module Make (P: NET_PARAMS) = struct
|
||||
module LU = Lwt_unix
|
||||
module LC = Lwt_condition
|
||||
open Lwt
|
||||
open Lwt_utils
|
||||
open Logging.Net
|
||||
|
||||
let pp_gid ppf gid =
|
||||
let pp_gid ppf gid =
|
||||
Format.pp_print_string ppf (Hex_encode.hex_encode gid)
|
||||
|
||||
(* the common version for a pair of peers, if any, is the maximum one,
|
||||
(* the common version for a pair of peers, if any, is the maximum one,
|
||||
in lexicographic order *)
|
||||
let common_version la lb =
|
||||
let common_version la lb =
|
||||
let la = List.sort (fun l r -> compare r l) la in
|
||||
let lb = List.sort (fun l r -> compare r l) lb in
|
||||
let rec find = function
|
||||
@ -161,10 +69,10 @@ module Make (P: NET_PARAMS) = struct
|
||||
else find (la, tb)
|
||||
in find (la, lb)
|
||||
|
||||
(* A net point (address x port). *)
|
||||
type point = addr * port
|
||||
(* A net point (address x port). *)
|
||||
type point = addr * port
|
||||
|
||||
let point_encoding =
|
||||
let point_encoding =
|
||||
let open Data_encoding in
|
||||
let open Ipaddr in
|
||||
conv
|
||||
@ -181,6 +89,38 @@ module Make (P: NET_PARAMS) = struct
|
||||
(req "addr" string)
|
||||
(req "port" int16))
|
||||
|
||||
type 'msg encoding = Encoding : {
|
||||
tag: int ;
|
||||
encoding: 'a Data_encoding.t ;
|
||||
wrap: 'a -> 'msg ;
|
||||
unwrap: 'msg -> 'a option ;
|
||||
max_length: int option ;
|
||||
} -> 'msg encoding
|
||||
|
||||
module type PARAMS = sig
|
||||
|
||||
(** Type of message used by higher layers *)
|
||||
type msg
|
||||
|
||||
val encodings : msg encoding list
|
||||
|
||||
(** Type of metadata associated to an identity *)
|
||||
type metadata
|
||||
|
||||
val initial_metadata : metadata
|
||||
val metadata_encoding : metadata Data_encoding.t
|
||||
val score : metadata -> float
|
||||
|
||||
(** High level protocol(s) talked by the peer. When two peers
|
||||
initiate a connection, they exchange their list of supported
|
||||
versions. The chosen one, if any, is the maximum common one (in
|
||||
lexicographic order) *)
|
||||
val supported_versions : version list
|
||||
|
||||
end
|
||||
|
||||
module Make (P: PARAMS) = struct
|
||||
|
||||
(* Low-level network protocol packets (internal). The protocol is
|
||||
completely symmetrical and asynchronous. First both peers must
|
||||
present their credentials with a [Connect] packet, then any
|
||||
@ -236,7 +176,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
(function Bootstrap -> Some () | _ -> None)
|
||||
(fun () -> Bootstrap);
|
||||
] @
|
||||
ListLabels.map P.msg_encodings ~f:begin function Encoding { tag; encoding; wrap; unwrap } ->
|
||||
ListLabels.map P.encodings ~f:begin function Encoding { tag; encoding; wrap; unwrap } ->
|
||||
case ~tag encoding
|
||||
(function Message msg -> unwrap msg | _ -> None)
|
||||
(fun msg -> Message (wrap msg))
|
||||
@ -250,7 +190,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
| 3 -> Some 0
|
||||
| 4 -> Some (1 + 1000 * 17) (* tag + 1000 * max (point size) *)
|
||||
| 5 -> Some 0
|
||||
| n -> ListLabels.fold_left P.msg_encodings ~init:None ~f:begin fun a -> function
|
||||
| n -> ListLabels.fold_left P.encodings ~init:None ~f:begin fun a -> function
|
||||
Encoding { tag; max_length } -> if tag = n then max_length else a
|
||||
end
|
||||
|
||||
@ -359,8 +299,8 @@ module Make (P: NET_PARAMS) = struct
|
||||
peers : unit -> peer list ;
|
||||
find_peer : gid -> peer option ;
|
||||
peer_info : peer -> peer_info ;
|
||||
set_meta : gid -> P.meta -> unit ;
|
||||
get_meta : gid -> P.meta option ;
|
||||
set_metadata : gid -> P.metadata -> unit ;
|
||||
get_metadata : gid -> P.metadata option ;
|
||||
}
|
||||
|
||||
(* The (internal) type of network events, those dispatched from peer
|
||||
@ -639,7 +579,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
unreachable_since : float option;
|
||||
connections : (int * float) option ;
|
||||
white_listed : bool ;
|
||||
meta : P.meta ;
|
||||
meta : P.metadata ;
|
||||
}
|
||||
|
||||
(* Ad hoc comparison on sources such as good source < bad source *)
|
||||
@ -789,7 +729,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
let source = { unreachable_since = None ;
|
||||
connections = None ;
|
||||
white_listed = true ;
|
||||
meta = P.init_meta ;
|
||||
meta = P.initial_metadata ;
|
||||
}
|
||||
in
|
||||
List.fold_left
|
||||
@ -821,14 +761,14 @@ module Make (P: NET_PARAMS) = struct
|
||||
{ unreachable_since = None ;
|
||||
connections = None ;
|
||||
white_listed = true ;
|
||||
meta = P.init_meta ; } in
|
||||
meta = P.initial_metadata ; } in
|
||||
PeerMap.update (addr, port) source r
|
||||
| Some (c, t, gid) ->
|
||||
let source =
|
||||
{ unreachable_since = None ;
|
||||
connections = Some (c, t) ;
|
||||
white_listed = PointSet.mem (addr, port) white_list ;
|
||||
meta = P.init_meta ; } in
|
||||
meta = P.initial_metadata ; } in
|
||||
PeerMap.update (addr, port) ~gid source r)
|
||||
PeerMap.empty k in
|
||||
let black_list =
|
||||
@ -1066,17 +1006,17 @@ module Make (P: NET_PARAMS) = struct
|
||||
{ connections = Some (1, Unix.gettimeofday ()) ;
|
||||
unreachable_since = None ;
|
||||
white_listed ;
|
||||
meta = P.init_meta }
|
||||
meta = P.initial_metadata }
|
||||
| { connections = Some (n, _) ; white_listed } ->
|
||||
{ connections = Some (n + 1, Unix.gettimeofday ()) ;
|
||||
unreachable_since = None ;
|
||||
white_listed ;
|
||||
meta = P.init_meta }
|
||||
meta = P.initial_metadata }
|
||||
with Not_found ->
|
||||
{ connections = Some (1, Unix.gettimeofday ()) ;
|
||||
unreachable_since = None ;
|
||||
white_listed = white_listed point ;
|
||||
meta = P.init_meta }
|
||||
meta = P.initial_metadata }
|
||||
in
|
||||
(* if it's me, it's probably not me *)
|
||||
if my_gid = peer.gid then begin
|
||||
@ -1136,7 +1076,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
{ unreachable_since = None ;
|
||||
connections = None ;
|
||||
white_listed = false ;
|
||||
meta = P.init_meta } in
|
||||
meta = P.initial_metadata } in
|
||||
known_peers := PeerMap.update point source !known_peers ;
|
||||
LC.broadcast new_contact point)
|
||||
peers ;
|
||||
@ -1276,7 +1216,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
{ unreachable_since = None ;
|
||||
connections = None ;
|
||||
white_listed = true ;
|
||||
meta = P.init_meta },
|
||||
meta = P.initial_metadata },
|
||||
None in
|
||||
known_peers := PeerMap.update point ?gid source !known_peers
|
||||
and whitelist peer =
|
||||
@ -1292,11 +1232,11 @@ module Make (P: NET_PARAMS) = struct
|
||||
LC.broadcast please_maintain () ;
|
||||
waiter
|
||||
and roll () = Pervasives.failwith "roll"
|
||||
and get_meta _gid = None (* TODO: implement *)
|
||||
and set_meta _gid _meta = () (* TODO: implement *)
|
||||
and get_metadata _gid = None (* TODO: implement *)
|
||||
and set_metadata _gid _meta = () (* TODO: implement *)
|
||||
in
|
||||
let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ;
|
||||
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta } in
|
||||
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in
|
||||
(* main thread, returns after first successful maintenance *)
|
||||
maintain () >>= fun () ->
|
||||
debug "(%a) network succesfully bootstrapped" pp_gid my_gid ;
|
||||
@ -1318,10 +1258,10 @@ module Make (P: NET_PARAMS) = struct
|
||||
let maintain () = Lwt.return_unit in
|
||||
let roll () = Lwt.return_unit in
|
||||
let peer_info _ = assert false in
|
||||
let get_meta _ = None in
|
||||
let set_meta _ _ = () in
|
||||
let get_metadata _ = None in
|
||||
let set_metadata _ _ = () in
|
||||
{ shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ;
|
||||
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta }
|
||||
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata }
|
||||
|
||||
|
||||
(* Plug toplevel functions to callback calls. *)
|
||||
@ -1337,7 +1277,7 @@ module Make (P: NET_PARAMS) = struct
|
||||
let roll net = net.roll ()
|
||||
let blacklist _net _gid = ()
|
||||
let whitelist _net _gid = ()
|
||||
let get_meta net gid = net.get_meta gid
|
||||
let set_meta net gid meta = net.set_meta gid meta
|
||||
let get_metadata net gid = net.get_metadata gid
|
||||
let set_metadata net gid meta = net.set_metadata gid meta
|
||||
end
|
||||
|
||||
|
@ -56,36 +56,42 @@ type limits = {
|
||||
(** A global identifier for a peer, a.k.a. an identity *)
|
||||
type gid
|
||||
|
||||
type 'msg msg_encoding = Encoding : {
|
||||
type 'msg encoding = Encoding : {
|
||||
tag: int ;
|
||||
encoding: 'a Data_encoding.t ;
|
||||
wrap: 'a -> 'msg ;
|
||||
unwrap: 'msg -> 'a option ;
|
||||
max_length: int option ;
|
||||
} -> 'msg msg_encoding
|
||||
} -> 'msg encoding
|
||||
|
||||
module type NET_PARAMS = sig
|
||||
type meta (** Type of metadata associated to an identity *)
|
||||
type msg (** Type of message used by higher layers *)
|
||||
module type PARAMS = sig
|
||||
|
||||
val msg_encodings : msg msg_encoding list
|
||||
(** Type of message used by higher layers *)
|
||||
type msg
|
||||
|
||||
val init_meta : meta
|
||||
val score_enc : meta Data_encoding.t
|
||||
val score: meta -> float
|
||||
val encodings : msg encoding list
|
||||
|
||||
(** Type of metadata associated to an identity *)
|
||||
type metadata
|
||||
|
||||
val initial_metadata : metadata
|
||||
val metadata_encoding : metadata Data_encoding.t
|
||||
val score : metadata -> float
|
||||
|
||||
(** High level protocol(s) talked by the peer. When two peers
|
||||
initiate a connection, they exchange their list of supported
|
||||
versions. The chosen one, if any, is the maximum common one (in
|
||||
lexicographic order) *)
|
||||
val supported_versions : version list
|
||||
|
||||
end
|
||||
|
||||
module Make (P : NET_PARAMS) : sig
|
||||
module Make (P : PARAMS) : sig
|
||||
|
||||
type net
|
||||
|
||||
(** A faked p2p layer, which do not initiate any connection
|
||||
nor open any listening socket. *)
|
||||
nor open any listening socket *)
|
||||
val faked_network : net
|
||||
|
||||
(** Main network initialisation function *)
|
||||
@ -110,18 +116,18 @@ module Make (P : NET_PARAMS) : sig
|
||||
val find_peer : net -> gid -> peer option
|
||||
|
||||
type peer_info = {
|
||||
gid : gid;
|
||||
addr : addr;
|
||||
port : port;
|
||||
version : version;
|
||||
gid : gid ;
|
||||
addr : addr ;
|
||||
port : port ;
|
||||
version : version ;
|
||||
}
|
||||
|
||||
(** Access the info of an active peer, if available *)
|
||||
val peer_info : net -> peer -> peer_info
|
||||
|
||||
(** Accessors for meta information about a peer *)
|
||||
val get_meta : net -> gid -> P.meta option
|
||||
val set_meta : net -> gid -> P.meta -> unit
|
||||
(** Accessors for meta information about a global identifier *)
|
||||
val get_metadata : net -> gid -> P.metadata option
|
||||
val set_metadata : net -> gid -> P.metadata -> unit
|
||||
|
||||
(** Wait for a payload from any peer in the network *)
|
||||
val recv : net -> (peer * P.msg) Lwt.t
|
||||
@ -143,4 +149,5 @@ module Make (P : NET_PARAMS) : sig
|
||||
|
||||
(** Keep a connection to this pair as often as possible *)
|
||||
val whitelist : net -> gid -> unit
|
||||
|
||||
end
|
||||
|
@ -20,7 +20,7 @@ module Param = struct
|
||||
| Get_protocols of Protocol_hash.t list
|
||||
| Protocol of MBytes.t
|
||||
|
||||
let msg_encodings =
|
||||
let encodings =
|
||||
let open Data_encoding in
|
||||
let case ?max_length ~tag encoding unwrap wrap =
|
||||
P2p.Encoding { tag; encoding; wrap; unwrap; max_length } in
|
||||
@ -71,9 +71,9 @@ module Param = struct
|
||||
(fun proto -> Protocol proto);
|
||||
]
|
||||
|
||||
type meta = unit
|
||||
let init_meta = ()
|
||||
let score_enc = Data_encoding.empty
|
||||
type metadata = unit
|
||||
let initial_metadata = ()
|
||||
let metadata_encoding = Data_encoding.empty
|
||||
let score () = 0.
|
||||
|
||||
let supported_versions =
|
||||
|
@ -40,10 +40,10 @@ val peer_info : net -> peer -> peer_info
|
||||
|
||||
(** Accessors for meta information about a global identifier *)
|
||||
|
||||
type meta = unit
|
||||
type metadata = unit
|
||||
|
||||
val get_meta : net -> gid -> meta option
|
||||
val set_meta : net -> gid -> meta -> unit
|
||||
val get_metadata : net -> gid -> metadata option
|
||||
val set_metadata : net -> gid -> metadata -> unit
|
||||
|
||||
type net_id = Store.net_id
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user