P2p: refactor the mli

This commit is contained in:
Vincent Bernardoff 2016-11-07 14:32:10 +01:00 committed by Grégoire Henry
parent cbca39d4ea
commit ff1c08f876
14 changed files with 1452 additions and 1222 deletions

View File

@ -231,7 +231,6 @@ NODE_LIB_INTFS := \
node/shell/prevalidator.mli \
node/shell/validator.mli \
\
node/shell/messages.mli \
node/shell/discoverer.mli \
node/shell/node_rpc_services.mli \
node/shell/node.mli \
@ -257,9 +256,10 @@ NODE_LIB_IMPLS := \
node/updater/proto_environment.ml \
node/updater/register.ml \
\
node/shell/messages.ml \
node/shell/netparams.ml \
node/shell/state.ml \
\
node/shell/messages.ml \
node/shell/prevalidator.ml \
node/shell/validator.ml \
\

View File

@ -7,20 +7,25 @@
(* *)
(**************************************************************************)
module LU = Lwt_unix
module LC = Lwt_condition
open Lwt
open Lwt_utils
open Netbits
open Logging.Net
let pp_gid ppf gid =
Format.pp_print_string ppf (Hex_encode.hex_encode gid)
(* public types *)
type addr = Ipaddr.t
type port = int
type version = string * int * int
type version = {
name : string ;
major : int ;
minor : int ;
}
let version_encoding =
let open Data_encoding in
conv
(fun { name; major; minor } -> (name, major, minor))
(fun (name, major, minor) -> { name; major; minor })
(obj3
(req "name" string)
(req "major" int8)
(req "minor" int8))
type limits = {
max_packet_size : int ;
peer_answer_timeout : float ;
@ -32,12 +37,116 @@ type limits = {
type config = {
incoming_port : port option ;
discovery_port : port option ;
supported_versions : version list ;
known_peers : (addr * port) list ;
peers_file : string ;
closed_network : bool ;
}
type 'msg msg_encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg msg_encoding
module type NET_PARAMS = sig
type meta (** Type of metadata associated to an identity *)
type msg (** Type of message used by higher layers *)
val msg_encodings : msg msg_encoding list
val init_meta : meta
val score_enc : meta Data_encoding.t
val score: meta -> float
(** High level protocol(s) talked by the peer. When two peers
initiate a connection, they exchange their list of supported
versions. The chosen one, if any, is the maximum common one (in
lexicographic order) *)
val supported_versions : version list
end
module type S = sig
include NET_PARAMS
type net
(** A faked p2p layer, which do not initiate any connection
nor open any listening socket. *)
val faked_network : net
(** Main network initialisation function *)
val bootstrap : config:config -> limits:limits -> net Lwt.t
(** A maintenance operation : try and reach the ideal number of peers *)
val maintain : net -> unit Lwt.t
(** Voluntarily drop some peers and replace them by new buddies *)
val roll : net -> unit Lwt.t
(** Close all connections properly *)
val shutdown : net -> unit Lwt.t
(** A connection to a peer *)
type peer
(** A global identifier for a peer, a.k.a. an identity *)
type gid
(** Access the domain of active peers *)
val peers : net -> peer list
(** Return the active peer with identity [gid] *)
val find_peer : net -> gid -> peer option
type peer_info = {
gid : gid;
addr : addr;
port : port;
version : version;
}
(** Access the info of an active peer, if available *)
val peer_info : net -> peer -> peer_info
(** Accessors for meta information about a peer *)
val get_meta : net -> gid -> meta option
val set_meta : net -> gid -> meta -> unit
(** Wait for a payload from any peer in the network *)
val recv : net -> (peer * msg) Lwt.t
(** Send a payload to a peer and wait for it to be in the tube *)
val send : net -> peer -> msg -> unit Lwt.t
(** Send a payload to a peer without waiting for the result. Return
[true] if the message can be enqueued in the peer's output queue
or [false] otherwise. *)
val try_send : net -> peer -> msg -> bool
(** Send a payload to all peers *)
val broadcast : net -> msg -> unit
(** Shutdown the connection to all peers at this address and stop the
communications with this machine for [duration] seconds *)
val blacklist : net -> gid -> unit
(** Keep a connection to this pair as often as possible *)
val whitelist : net -> gid -> unit
end
module Make (P: NET_PARAMS) = struct
module LU = Lwt_unix
module LC = Lwt_condition
open Lwt
open Lwt_utils
open Netbits
open Logging.Net
let pp_gid ppf gid =
Format.pp_print_string ppf (Hex_encode.hex_encode gid)
(* the common version for a pair of peers, if any, is the maximum one,
in lexicographic order *)
let common_version la lb =
@ -57,6 +166,23 @@ type gid = string
(* A net point (address x port). *)
type point = addr * port
let point_encoding =
let open Data_encoding in
let open Ipaddr in
conv
(fun (addr, port) ->
(match addr with
| V4 v4 -> V4.to_bytes v4
| V6 v6 -> V6.to_bytes v6), port)
(fun (addr, port) ->
(match String.length addr with
| 4 -> V4 (V4.of_bytes_exn addr)
| 16 -> V6 (V6.of_bytes_exn addr)
| _ -> Pervasives.failwith "point_encoding"), port)
(obj2
(req "addr" string)
(req "port" int16))
(* Low-level network protocol packets (internal). The protocol is
completely symmetrical and asynchronous. First both peers must
present their credentials with a [Connect] packet, then any
@ -65,125 +191,186 @@ type point = addr * port
transmission (and needs not being replied). The [Unkown] packet is
not a real kind of packet, it means that something indecypherable
was transmitted. *)
type packet =
| Connect of gid * int option * version list
type hello = {
gid: gid;
port: int option;
versions: version list;
}
let hello_encoding =
let open Data_encoding in
conv
(fun { gid; port; versions } -> (gid, port, versions))
(fun (gid, port, versions) -> { gid; port; versions })
(obj3
(req "gid" (Fixed.string 16)) (* TODO: get rid of constant *)
(opt "port" int16)
(req "versions" (Variable.list version_encoding)))
type msg =
| Connect of hello
| Disconnect
| Advertise of (addr * port) list
| Message of Netbits.frame
| Advertise of point list
| Ping
| Pong
| Bootstrap
| Unknown of Netbits.frame
| Message of P.msg
(* read a packet from a TCP socket *)
let recv_packet
: LU.file_descr -> int -> packet Lwt.t
= fun socket limit ->
Netbits.read socket limit >>= function
| None ->
return Disconnect
| Some frame ->
let decode_versions msg frame cb =
let rec decode_versions acc = function
| F [ B name ; S maj ; S min ] :: rest ->
decode_versions ((MBytes.to_string name, maj, min) :: acc) rest
| [] -> cb (List.rev acc)
| _ -> return (Unknown msg)
in decode_versions [] frame
in
match frame with
| [ S 1 ] -> return Disconnect
| [ S 2 ] -> return Ping
| [ S 12 ] -> return Pong
| [ S 3 ] -> return Bootstrap
| [ S 4 ; B gid ; S port ; F rest ] as msg ->
decode_versions msg rest @@ fun versions ->
return (Connect (MBytes.to_string gid, Some port, versions))
| [ S 4 ; B gid ; F rest ] as msg ->
decode_versions msg rest @@ fun versions ->
return (Connect (MBytes.to_string gid, None, versions))
| [ S 5 ; F rest ] as msg ->
let rec decode_peers acc = function
| F [ B addr ; S port ] :: rest -> begin
match Ipaddr.of_string @@ MBytes.to_string addr with
| Some addr ->
decode_peers ((addr, port) :: acc) rest
| None ->
decode_peers acc rest
let msg_encoding =
let open Data_encoding in
union ~tag_size:`Uint8 begin [
case ~tag:0x00 hello_encoding
(function Connect hello -> Some hello | _ -> None)
(fun hello -> Connect hello);
case ~tag:0x01 null
(function Disconnect -> Some () | _ -> None)
(fun () -> Disconnect);
case ~tag:0x02 null
(function Ping -> Some () | _ -> None)
(fun () -> Ping);
case ~tag:0x03 null
(function Pong -> Some () | _ -> None)
(fun () -> Pong);
case ~tag:0x04 (Variable.list point_encoding)
(function Advertise points -> Some points | _ -> None)
(fun points -> Advertise points);
case ~tag:0x05 null
(function Bootstrap -> Some () | _ -> None)
(fun () -> Bootstrap);
] @
ListLabels.map P.msg_encodings ~f:begin function Encoding { tag; encoding; wrap; unwrap } ->
case ~tag encoding
(function Message msg -> unwrap msg | _ -> None)
(fun msg -> Message (wrap msg))
end
end
| [] -> Advertise (List.rev acc)
| _ -> Unknown msg
in return (decode_peers [] rest)
| [ S 6 ; F rest ] -> return (Message rest)
| msg -> return (Unknown msg)
(* send a packet over a TCP socket *)
let send_packet
: LU.file_descr -> packet -> bool Lwt.t
= fun socket packet ->
let frame = match packet with
| Unknown _ -> assert false (* should never happen *)
| Disconnect -> [ S 1 ]
| Ping -> [ S 2 ]
| Pong -> [ S 12 ]
| Bootstrap -> [ S 3 ]
| Connect (gid, port, versions) ->
let rec encode = function
| (name, maj, min) :: tl ->
let rest = encode tl in
F [ B (MBytes.of_string name) ; S maj ; S min ] :: rest
| [] -> []
let max_length = function
| 0 -> Some 1024
| 1 -> Some 0
| 2 -> Some 0
| 3 -> Some 0
| 4 -> Some (1 + 1000 * 17) (* tag + 1000 * max (point size) *)
| 5 -> Some 0
| n -> ListLabels.fold_left P.msg_encodings ~init:None ~f:begin fun a -> function
Encoding { tag; max_length } -> if tag = n then max_length else a
end
module BE = EndianBigstring.BigEndian
(** Read a message from a file descriptor and returns (tag, msg) *)
let read fd buf =
let rec read_into_exactly ?(pos=0) ?len descr buf =
let len = match len with None -> MBytes.length buf | Some l -> l in
let rec inner pos len =
if len = 0 then
Lwt.return_unit
else
Lwt_bytes.read descr buf pos len >>= fun nb_read ->
inner (pos + nb_read) (len - nb_read)
in
[ S 4 ; B (MBytes.of_string gid) ]
@ (match port with | Some port -> [ S port ] | None -> [])
@ [ F (encode versions) ]
| Advertise peers ->
let rec encode = function
| (addr, port) :: tl ->
let rest = encode tl in
F [ B (MBytes.of_string @@ Ipaddr.to_string addr) ; S port ] :: rest
| [] -> []
in [ S 5 ; F (encode peers) ]
| Message message -> [ S 6 ; F message ] in
Netbits.write socket frame
inner pos len
in
catch (fun () ->
Lwt_bytes.recv fd buf 0 4 [ Lwt_unix.MSG_PEEK ] >>= fun hdrlen ->
if hdrlen <> 4 then begin
debug "read: could not read enough bytes to determine message size, aborting";
return None
end
else
Lwt_bytes.read fd buf 0 4 >>= fun _hdrlen ->
let len = Int32.to_int (BE.get_int32 buf 0) in
if len < 0 || len > MBytes.length buf then begin
debug "read: invalid message size %d" len;
return None
end
else
read_into_exactly fd buf ~pos:4 ~len >|= fun () ->
let tag = BE.get_uint8 buf 4 in
Some (tag, MBytes.sub buf 4 len))
(function
| Unix.Unix_error (_err, _, _) -> return None
| e -> fail e)
(* A net handler, as a record-encoded object, abstract from the
outside world. Hidden Lwt workers are associated to a net at its
creation and can be killed using the shutdown callback. *)
type net = {
recv_from : unit -> (peer * Netbits.frame) Lwt.t ;
send_to : peer * Netbits.frame -> unit Lwt.t ;
push : peer * Netbits.frame -> unit ;
broadcast : Netbits.frame -> unit ;
blacklist : ?duration:float -> addr -> unit ;
whitelist : peer -> unit ;
maintain : unit -> unit Lwt.t ;
roll : unit -> unit Lwt.t ;
shutdown : unit -> unit Lwt.t ;
peers : unit -> peer list ;
peer_info : peer -> addr * port * version ;
}
(** Write a message to file descriptor. *)
let write ?(pos=0) ?len descr buf =
let len = match len with None -> MBytes.length buf | Some l -> l in
catch
(fun () ->
Lwt_bytes.write descr buf pos len >>= fun _nb_written ->
return true)
(function
| Unix.Unix_error _ -> return false
| e -> fail e)
(* read a message from a TCP socket *)
let recv_msg fd buf =
read fd buf >|= function
| None -> None
| Some (tag, msg) ->
match max_length tag with
| Some len when MBytes.length msg > len -> None
| _ -> Data_encoding.Binary.of_bytes msg_encoding msg
(* send a message over a TCP socket *)
let send_msg fd buf packet =
catch
(fun () ->
match Data_encoding.Binary.write msg_encoding packet buf 4 with
| None -> return false
| Some len ->
BE.set_int32 buf 0 @@ Int32.of_int (len - 4);
write fd buf ~len
)
(fun exn -> Lwt.fail exn)
(* A peer handle, as a record-encoded object, abstract from the
outside world. A hidden Lwt worker is associated to a peer at its
creation and is killed using the disconnect callback by net
workers (on shutdown of during maintenance). *)
and peer = {
type peer = {
gid : gid ;
point : point ;
listening_port : port option ;
version : version ;
last_seen : unit -> float ;
disconnect : unit -> unit Lwt.t;
send : packet -> unit Lwt.t ;
send : msg -> unit Lwt.t ;
}
type peer_info = {
gid : gid ;
addr : addr ;
port : port ;
version : version ;
}
(* A net handler, as a record-encoded object, abstract from the
outside world. Hidden Lwt workers are associated to a net at its
creation and can be killed using the shutdown callback. *)
type net = {
recv_from : unit -> (peer * P.msg) Lwt.t ;
send_to : peer -> P.msg -> unit Lwt.t ;
try_send : peer -> P.msg -> bool ;
broadcast : P.msg -> unit ;
blacklist : ?duration:float -> addr -> unit ;
whitelist : peer -> unit ;
maintain : unit -> unit Lwt.t ;
roll : unit -> unit Lwt.t ;
shutdown : unit -> unit Lwt.t ;
peers : unit -> peer list ;
find_peer : gid -> peer option ;
peer_info : peer -> peer_info ;
set_meta : gid -> P.meta -> unit ;
get_meta : gid -> P.meta option ;
}
(* The (internal) type of network events, those dispatched from peer
workers to the net and others internal to net workers. *)
and event =
type event =
| Disconnected of peer
| Bootstrap of peer
| Recv of peer * Netbits.frame
| Recv of peer * P.msg
| Peers of point list
| Contact of point * LU.file_descr
| Connected of peer
@ -286,22 +473,22 @@ let connect_to_peer config limits my_gid socket (addr, port) push white_listed =
(* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = canceler () in
(* a cancelable reception *)
let recv () =
pick [ recv_packet socket limits.max_packet_size ;
let recv buf =
pick [ (recv_msg socket buf >|= function Some p -> p | None -> Disconnect);
(cancelation () >>= fun () -> return Disconnect) ] in
(* First step: send and receive credentials, makes no difference
whether we're trying to connect to a peer or checking an incoming
connection, both parties must first present themselves. *)
let rec connect () =
send_packet socket (Connect (my_gid,
config.incoming_port,
config.supported_versions)) >>= fun _ ->
let rec connect buf =
send_msg socket buf (Connect { gid = my_gid ;
port = config.incoming_port ;
versions = P.supported_versions }) >>= fun _ ->
pick [ (LU.sleep limits.peer_answer_timeout >>= fun () -> return Disconnect) ;
recv () ] >>= function
| Connect (gid, listening_port, versions) ->
recv buf ] >>= function
| Connect { gid; port = listening_port; versions } ->
debug "(%a) connection requested from %a @ %a:%d"
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
begin match common_version config.supported_versions versions with
begin match common_version P.supported_versions versions with
| None ->
debug "(%a) connection rejected (incompatible versions) from %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ;
@ -310,7 +497,7 @@ let connect_to_peer config limits my_gid socket (addr, port) push white_listed =
if config.closed_network then
match listening_port with
| Some port when white_listed (addr, port) ->
connected version gid listening_port
connected buf version gid listening_port
| Some port ->
debug "(%a) connection rejected (out of the closed network) from %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ;
@ -320,7 +507,7 @@ let connect_to_peer config limits my_gid socket (addr, port) push white_listed =
pp_gid my_gid Ipaddr.pp_hum addr ;
cancel ()
else
connected version gid listening_port
connected buf version gid listening_port
end
| Advertise peers ->
(* alternatively, one can refuse a connection but reply with
@ -338,36 +525,31 @@ let connect_to_peer config limits my_gid socket (addr, port) push white_listed =
pp_gid my_gid Ipaddr.pp_hum addr port ;
cancel ()
(* Them we can build the net object and launch the worker. *)
and connected version gid listening_port =
and connected buf version gid listening_port =
(* net object state *)
let last = ref (Unix.gettimeofday ()) in
(* net object callbaks *)
let last_seen () = !last in
let disconnect () = cancel () in
let send p = send_packet socket p >>= fun _ -> return () in
let send p = send_msg socket buf p >>= fun _ -> return () in
(* net object construction *)
let peer = { gid ; point = (addr, port) ; listening_port ;
version ; last_seen ; disconnect ; send } in
(* The packet reception loop. *)
let rec receiver () =
recv () >>= fun packet ->
recv buf >>= fun packet ->
last := Unix.gettimeofday () ;
match packet with
| Connect _
| Unknown _ ->
debug "(%a) disconnected (bad request) %a @ %a:%d"
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
cancel ()
| Disconnect ->
debug "(%a) disconnected (by peer) %a @ %a:%d"
pp_gid my_gid pp_gid gid Ipaddr.pp_hum addr port ;
cancel ()
| Bootstrap -> push (Bootstrap peer) ; receiver ()
| Advertise peers -> push (Peers peers) ; receiver ()
| Ping -> send_packet socket Pong >>= fun _ -> receiver ()
| Ping -> send_msg socket buf Pong >>= fun _ -> receiver ()
| Pong -> receiver ()
| Message msg ->
push (Recv (peer, msg)) ; receiver ()
| Message msg -> push (Recv (peer, msg)) ; receiver ()
in
(* The polling loop *)
let rec pulse_monitor ping =
@ -388,7 +570,7 @@ let connect_to_peer config limits my_gid socket (addr, port) push white_listed =
if now -. !last < limits.peer_answer_timeout then
pulse_monitor None
else
send_packet socket Ping >>= fun _ ->
send_msg socket buf Ping >>= fun _ ->
pulse_monitor (Some (Unix.gettimeofday ()))
else return ()
in
@ -398,15 +580,16 @@ let connect_to_peer config limits my_gid socket (addr, port) push white_listed =
(* Launch both workers *)
join [ pulse_monitor None ; receiver () ]
in
let buf = MBytes.create 0x100_000 in
on_cancel (fun () ->
send_packet socket Disconnect >>= fun _ ->
send_msg socket buf Disconnect >>= fun _ ->
LU.close socket >>= fun _ ->
return ()) ;
let worker_name =
Format.asprintf
"(%a) connection handler for %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port in
ignore (worker ~safe:true worker_name ~run:connect ~cancel) ;
ignore (worker ~safe:true worker_name ~run:(fun () -> connect buf) ~cancel) ;
(* return the canceler *)
cancel
@ -454,10 +637,12 @@ let peers_file_encoding =
(req "port" int31))))))
(* Info on peers maintained between connections *)
type source =
{ unreachable_since : float option;
type source = {
unreachable_since : float option;
connections : (int * float) option ;
white_listed : bool }
white_listed : bool ;
meta : P.meta ;
}
(* Ad hoc comparison on sources such as good source < bad source *)
let compare_sources s1 s2 =
@ -570,7 +755,7 @@ let discovery_sender my_gid disco_port inco_port cancelation restart =
in loop 0.2 1
(* Main network creation and initialisation function *)
let bootstrap config limits =
let bootstrap ~config ~limits =
(* we need to ignore SIGPIPEs *)
Sys.(set_signal sigpipe Signal_ignore) ;
(* a non exception-based cancelation mechanism *)
@ -596,10 +781,12 @@ let bootstrap config limits =
let my_gid =
fresh_gid () in
let known_peers =
let source =
{ unreachable_since = None ;
let source = { unreachable_since = None ;
connections = None ;
white_listed = true } in
white_listed = true ;
meta = P.init_meta ;
}
in
List.fold_left
(fun r point -> PeerMap.update point source r)
PeerMap.empty config.known_peers in
@ -628,13 +815,15 @@ let bootstrap config limits =
let source =
{ unreachable_since = None ;
connections = None ;
white_listed = true } in
white_listed = true ;
meta = P.init_meta ; } in
PeerMap.update (addr, port) source r
| Some (c, t, gid) ->
let source =
{ unreachable_since = None ;
connections = Some (c, t) ;
white_listed = PointSet.mem (addr, port) white_list } in
white_listed = PointSet.mem (addr, port) white_list ;
meta = P.init_meta ; } in
PeerMap.update (addr, port) ~gid source r)
PeerMap.empty k in
let black_list =
@ -871,15 +1060,18 @@ let bootstrap config limits =
| { connections = None ; white_listed } ->
{ connections = Some (1, Unix.gettimeofday ()) ;
unreachable_since = None ;
white_listed }
white_listed ;
meta = P.init_meta }
| { connections = Some (n, _) ; white_listed } ->
{ connections = Some (n + 1, Unix.gettimeofday ()) ;
unreachable_since = None ;
white_listed}
white_listed ;
meta = P.init_meta }
with Not_found ->
{ connections = Some (1, Unix.gettimeofday ()) ;
unreachable_since = None ;
white_listed = white_listed point }
white_listed = white_listed point ;
meta = P.init_meta }
in
(* if it's me, it's probably not me *)
if my_gid = peer.gid then begin
@ -928,8 +1120,8 @@ let bootstrap config limits =
let sample = bootstrap_peers () in
Lwt.async (fun () -> peer.send (Advertise sample)) ;
main ()
| Recv (peer, message) ->
enqueue_msg (peer, message) ;
| Recv (peer, msg) ->
enqueue_msg (peer, msg) ;
main ()
| Peers peers ->
List.iter
@ -938,7 +1130,8 @@ let bootstrap config limits =
let source =
{ unreachable_since = None ;
connections = None ;
white_listed = false } in
white_listed = false ;
meta = P.init_meta } in
known_peers := PeerMap.update point source !known_peers ;
LC.broadcast new_contact point)
peers ;
@ -972,6 +1165,7 @@ let bootstrap config limits =
let main = worker (Format.asprintf "(%a) reception" pp_gid my_gid) main cancel in
let unblock = worker (Format.asprintf "(%a) unblacklister" pp_gid my_gid) unblock cancel in
let discovery_answerer =
let buf = MBytes.create 0x100_000 in
match config.discovery_port with
| Some disco_port ->
let answerer () =
@ -985,7 +1179,7 @@ let bootstrap config limits =
(* either reply by a list of peer or connect if we need peers *)
if PeerMap.cardinal !connected >= limits.expected_connections then begin
enqueue_event (Peers [ addr, port ]) ;
send_packet socket (Advertise (bootstrap_peers ())) >>= fun _ ->
send_msg socket buf (Advertise (bootstrap_peers ())) >>= fun _ ->
LU.close socket
end else begin
enqueue_event (Contact ((addr, port), socket)) ;
@ -1029,14 +1223,19 @@ let bootstrap config limits =
return ()
and peers () =
PeerMap.fold (fun _ _ peer r -> peer :: r) !connected []
and peer_info peer =
fst peer.point, snd peer.point, peer.version
and find_peer gid = try Some (PeerMap.by_gid gid !connected) with Not_found -> None
and peer_info (peer : peer) = {
gid = peer.gid ;
addr = fst peer.point ;
port = snd peer.point ;
version = peer.version ;
}
and recv_from () =
dequeue_msg ()
and send_to (peer, msg) =
and send_to peer msg =
peer.send (Message msg) >>= fun _ -> return ()
and push (peer, msg) =
Lwt.async (fun () -> peer.send (Message msg))
and try_send peer msg =
Lwt.async (fun () -> peer.send (Message msg)); true
and broadcast msg =
PeerMap.iter
(fun _ _ peer ->
@ -1071,7 +1270,8 @@ let bootstrap config limits =
with Not_found ->
{ unreachable_since = None ;
connections = None ;
white_listed = true },
white_listed = true ;
meta = P.init_meta },
None in
known_peers := PeerMap.update point ?gid source !known_peers
and whitelist peer =
@ -1087,9 +1287,11 @@ let bootstrap config limits =
LC.broadcast please_maintain () ;
waiter
and roll () = Pervasives.failwith "roll"
and get_meta _gid = None (* TODO: implement *)
and set_meta _gid _meta = () (* TODO: implement *)
in
let net = { shutdown ; peers ; recv_from ; send_to ; push ; broadcast ;
blacklist ; whitelist ; maintain ; roll ; peer_info } in
let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ;
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta } in
(* main thread, returns after first successful maintenance *)
maintain () >>= fun () ->
debug "(%a) network succesfully bootstrapped" pp_gid my_gid ;
@ -1101,28 +1303,36 @@ let faked_network =
Lwt.wakeup_exn wakeup Lwt_stream.Empty;
Lwt.return_unit in
let peers () = [] in
let find_peer _ = None in
let recv_from () = infinity in
let send_to _ = Lwt.return_unit in
let push _ = () in
let send_to _ _ = Lwt.return_unit in
let try_send _ _ = true in
let broadcast _ = () in
let blacklist ?duration _ = ignore duration ; () in
let whitelist _ = () in
let maintain () = Lwt.return_unit in
let roll () = Lwt.return_unit in
let peer_info _ = assert false in
{ shutdown ; peers ; recv_from ; send_to ; push ; broadcast ;
blacklist ; whitelist ; maintain ; roll ; peer_info }
let get_meta _ = None in
let set_meta _ _ = () in
{ shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ;
blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta }
(* Plug toplevel functions to callback calls. *)
let shutdown net = net.shutdown ()
let peers net = net.peers ()
let peer_info peer net = net.peer_info peer
let find_peer net gid = net.find_peer gid
let peer_info net peer = net.peer_info peer
let recv net = net.recv_from ()
let send (peer, msg) net = net.send_to (peer, msg)
let push peer net = net.push peer
let broadcast msg net = net.broadcast msg
let send net peer msg = net.send_to peer msg
let try_send net peer = net.try_send peer
let broadcast net msg = net.broadcast msg
let maintain net = net.maintain ()
let roll net = net.roll ()
let blacklist ?duration peer net = net.blacklist ?duration peer
let whitelist peer net = net.whitelist peer
let blacklist _net _gid = ()
let whitelist _net _gid = ()
let get_meta net gid = net.get_meta gid
let set_meta net gid meta = net.set_meta gid meta
end

View File

@ -7,21 +7,18 @@
(* *)
(**************************************************************************)
(** A P2P network *)
type net
(** A faked p2p layer, which do not initiate any connection
nor open any listening socket. *)
val faked_network : net
(** A peer connection address *)
type addr = Ipaddr.t
(** A peer connection port *)
type port = int
(** A protocol version tag: (name, major, minor) *)
type version = string * int * int
(** A p2p protocol version *)
type version = {
name : string ;
major : int ;
minor : int ;
}
(** Network configuration *)
type config = {
@ -31,11 +28,6 @@ type config = {
(** Tells if peers should be discovered automatically on the local
network, precising the UDP port to use *)
discovery_port : port option ;
(** High level protocol(s) talked by the peer. When two peers
initiate a connection, they exchange their list of supported
versions. The chosen one, if any, is the maximum common one (in
lexicographic order) *)
supported_versions : version list ;
(** List of hard-coded known peers to bootstrap the network from *)
known_peers : (addr * port) list ;
(** The path to the JSON file where the peer cache is loaded / stored *)
@ -47,7 +39,7 @@ type config = {
(** Network capacities *)
type limits = {
(** Maximum length in bytes of network frames *)
(** Maximum length in bytes of network messages' payload *)
max_packet_size : int ;
(** Delay after which a non responding peer is considered dead *)
peer_answer_timeout : float ;
@ -61,8 +53,40 @@ type limits = {
blacklist_time : float ;
}
type 'msg msg_encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg msg_encoding
module type NET_PARAMS = sig
type meta (** Type of metadata associated to an identity *)
type msg (** Type of message used by higher layers *)
val msg_encodings : msg msg_encoding list
val init_meta : meta
val score_enc : meta Data_encoding.t
val score: meta -> float
(** High level protocol(s) talked by the peer. When two peers
initiate a connection, they exchange their list of supported
versions. The chosen one, if any, is the maximum common one (in
lexicographic order) *)
val supported_versions : version list
end
module Make (P : NET_PARAMS) : sig
type net
(** A faked p2p layer, which do not initiate any connection
nor open any listening socket. *)
val faked_network : net
(** Main network initialisation function *)
val bootstrap : config -> limits -> net Lwt.t
val bootstrap : config:config -> limits:limits -> net Lwt.t
(** A maintenance operation : try and reach the ideal number of peers *)
val maintain : net -> unit Lwt.t
@ -76,27 +100,47 @@ val shutdown : net -> unit Lwt.t
(** A connection to a peer *)
type peer
(** A global identifier for a peer, a.k.a. an identity *)
type gid
(** Access the domain of active peers *)
val peers : net -> peer list
(** Return the active peer with identity [gid] *)
val find_peer : net -> gid -> peer option
type peer_info = {
gid : gid;
addr : addr;
port : port;
version : version;
}
(** Access the info of an active peer, if available *)
val peer_info : peer -> net -> addr * port * version
val peer_info : net -> peer -> peer_info
(** Wait for a Netbits.frame from any peer in the network *)
val recv : net -> (peer * Netbits.frame) Lwt.t
(** Accessors for meta information about a peer *)
val get_meta : net -> gid -> P.meta option
val set_meta : net -> gid -> P.meta -> unit
(** Send a Netbits.frame to a peer and wait for it to be in the tube *)
val send : peer * Netbits.frame -> net -> unit Lwt.t
(** Wait for a payload from any peer in the network *)
val recv : net -> (peer * P.msg) Lwt.t
(** Send a Netbits.frame to a peer asynchronously *)
val push : peer * Netbits.frame -> net -> unit
(** Send a payload to a peer and wait for it to be in the tube *)
val send : net -> peer -> P.msg -> unit Lwt.t
(** Send a Netbits.frame to all peers *)
val broadcast : Netbits.frame -> net -> unit
(** Send a payload to a peer without waiting for the result. Return
[true] if the message can be enqueued in the peer's output queue
or [false] otherwise. *)
val try_send : net -> peer -> P.msg -> bool
(** Send a payload to all peers *)
val broadcast : net -> P.msg -> unit
(** Shutdown the connection to all peers at this address and stop the
communications with this machine for [duration] seconds *)
val blacklist : ?duration:float -> addr -> net -> unit
val blacklist : net -> gid -> unit
(** Keep a connection to this pair as often as possible *)
val whitelist : peer -> net -> unit
val whitelist : net -> gid -> unit
end

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
module P2p = Netparams
type worker = {
shutdown: unit -> unit Lwt.t;
}
@ -15,7 +17,7 @@ let create_worker p2p state =
let cancelation, cancel, _on_cancel = Lwt_utils.canceler () in
let broadcast m = P2p.broadcast (Messages.to_frame m) p2p in
let broadcast m = P2p.broadcast p2p m in
let discovery_worker =
let rec worker_loop () =

View File

@ -9,6 +9,6 @@
type worker
val create_worker: P2p.net -> State.t -> worker
val create_worker: Netparams.net -> State.t -> worker
val shutdown: worker -> unit Lwt.t

View File

@ -7,11 +7,9 @@
(* *)
(**************************************************************************)
open Netbits
type net_id = Store.net_id
type message =
type t =
| Discover_blocks of net_id * Block_hash.t list (* Block locator *)
| Block_inventory of net_id * Block_hash.t list
@ -28,65 +26,53 @@ type message =
| Get_protocols of Protocol_hash.t list
| Protocol of MBytes.t
let encoding =
let open Data_encoding in
let case ?max_length ~tag encoding unwrap wrap =
P2p.Encoding { tag; encoding; wrap; unwrap; max_length }
in [
case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding))
(function
| Discover_blocks (Net genesis_bh, bhs) -> Some (genesis_bh, bhs)
| _ -> None)
(fun (genesis_bh, bhs) -> Discover_blocks (Net genesis_bh, bhs));
case ~tag:0x11 (tup2 Block_hash.encoding (list Block_hash.encoding))
(function
| Block_inventory (Net genesis_bh, bhs) -> Some (genesis_bh, bhs)
| _ -> None)
(fun (genesis_bh, bhs) -> Block_inventory (Net genesis_bh, bhs));
let to_frame msg =
case ~tag:0x12 (list Block_hash.encoding)
(function
| Get_blocks bhs -> Some bhs
| _ -> None)
(fun bhs -> Get_blocks bhs);
case ~tag:0x13 Data_encoding.bytes
(function Block b -> Some b | _ -> None)
(fun b -> Block b);
let bh h = B (Block_hash.to_bytes h) in
let oph h = B (Operation_hash.to_bytes h) in
let ph h = B (Protocol_hash.to_bytes h) in
match msg with
case ~tag:0x20 Block_hash.encoding
(function Current_operations (Net genesis_bh) -> Some genesis_bh | _ -> None)
(fun genesis_bh -> Current_operations (Net genesis_bh));
case ~tag:0x21 (tup2 Block_hash.encoding (list Operation_hash.encoding))
(function Operation_inventory ((Net genesis_bh), ops) -> Some (genesis_bh, ops) | _ -> None)
(fun (genesis_bh, ops) -> Operation_inventory (Net genesis_bh, ops));
| Discover_blocks (Net netid, blocks) ->
[ S 2100 ; bh netid ; F (List.map bh blocks) ]
| Block_inventory (Net netid, blocks) ->
[ S 2101 ; bh netid ; F (List.map bh blocks) ]
| Get_blocks blocks ->
[ S 2102 ; F (List.map bh blocks) ]
| Block b ->
[ S 2103 ; B b ]
| Current_operations (Net net_id) ->
[ S 2700 ; bh net_id ]
| Operation_inventory (Net net_id, ops) ->
[ S 2701 ; bh net_id ; F (List.map oph ops) ]
| Get_operations ops ->
[ S 2702 ; F (List.map oph ops) ]
| Operation b ->
[ S 2703 ; B b ]
| Get_protocols protos ->
[ S 2800 ; F (List.map ph protos) ]
| Protocol p ->
[ S 2801 ; B p ]
let from_frame msg =
let bh = function B s -> (Block_hash.of_bytes s) | _ -> invalid_arg "bh" in
let oph = function B s -> (Operation_hash.of_bytes s) | _ -> invalid_arg "oph" in
let ph = function B s -> (Protocol_hash.of_bytes s) | _ -> invalid_arg "ph" in
let net = function netid -> Store.Net (Block_hash.of_bytes netid) in
try match msg with
| [ S 2100 ; B netid ; F blocks ] ->
Some (Discover_blocks (net netid, List.map bh blocks))
| [ S 2101 ; B netid ; F blocks ] ->
Some (Block_inventory (net netid, List.map bh blocks))
| [ S 2102 ; F blocks ] ->
Some (Get_blocks (List.map bh blocks))
| [ S 2103 ; B bh ] -> Some (Block bh)
| [ S 2700 ; B netid ] ->
Some (Current_operations (net netid))
| [ S 2701 ; B netid ; F ops ] ->
Some (Operation_inventory (net netid, List.map oph ops))
| [ S 2702 ; F ops ] ->
Some (Get_operations (List.map oph ops))
| [ S 2703 ; B contents ] -> Some (Operation contents)
| [ S 2800 ; F protos ] -> Some (Get_protocols (List.map ph protos))
| [ S 2801 ; B contents ] -> Some (Protocol contents)
| _ -> None
with Invalid_argument _ -> None
case ~tag:0x22 (list Operation_hash.encoding)
(function
| Get_operations ops -> Some ops
| _ -> None)
(fun ops -> Get_operations ops);
case ~tag:0x23 Data_encoding.bytes
(function Operation o -> Some o | _ -> None)
(fun o -> Operation o);
case ~tag:0x32 (list Protocol_hash.encoding)
(function
| Get_protocols protos -> Some protos
| _ -> None)
(fun protos -> Get_protocols protos);
case ~tag:0x33 Data_encoding.bytes
(function Protocol proto -> Some proto | _ -> None)
(fun proto -> Protocol proto);
]

View File

@ -8,7 +8,7 @@
(**************************************************************************)
(** High level messages *)
type message =
type t =
| Discover_blocks of Store.net_id * Block_hash.t list (* Block locator *)
| Block_inventory of Store.net_id * Block_hash.t list
@ -25,9 +25,4 @@ type message =
| Get_protocols of Protocol_hash.t list
| Protocol of MBytes.t
(** Converts a high level message to a network frame *)
val to_frame: message -> Netbits.frame
(** Tries and convert a network frame to a high level message *)
val from_frame: Netbits.frame -> message option
val encoding : t P2p.msg_encoding list

View File

@ -7,12 +7,12 @@
(* *)
(**************************************************************************)
module P2p = Netparams
open Logging.Node.Worker
let (>|=) = Lwt.(>|=)
let supported_versions = ["TEZOS", 0, 0]
let inject_operation validator ?force bytes =
let t =
match Store.Operation.of_bytes bytes with
@ -194,18 +194,17 @@ type t = {
let request_operations net _net_id operations =
(* TODO improve the lookup strategy.
For now simply broadcast the request to all our neighbours. *)
P2p.broadcast
(Messages.(to_frame (Get_operations operations))) net
P2p.broadcast net (Get_operations operations)
let request_blocks net _net_id blocks =
(* TODO improve the lookup strategy.
For now simply broadcast the request to all our neighbours. *)
P2p.broadcast (Messages.(to_frame (Get_blocks blocks))) net
P2p.broadcast net (Get_blocks blocks)
let request_protocols net protocols =
(* TODO improve the lookup strategy.
For now simply broadcast the request to all our neighbours. *)
P2p.broadcast (Messages.(to_frame (Get_protocols protocols))) net
P2p.broadcast net (Get_protocols protocols)
let init_p2p net_params =
match net_params with
@ -244,19 +243,10 @@ let create
lwt_log_info "starting worker..." >>= fun () ->
let worker =
let handle_msg peer frame =
lwt_log_info "received message" >>= fun () ->
match Messages.from_frame frame with
| None ->
lwt_warn "can't parse message" >>= fun () ->
(* FIXME 60 second ? parameter... and Log_notice *)
let addr, _, _ = P2p.peer_info peer p2p in
P2p.blacklist ~duration:60. addr p2p ;
Lwt.return_unit
| Some msg ->
let handle_msg peer msg =
process state validator msg >>= fun msgs ->
List.iter
(fun msg -> P2p.push (peer, Messages.to_frame msg) p2p)
(fun msg -> ignore @@ P2p.try_send p2p peer msg)
msgs;
Lwt.return_unit
in

View File

@ -9,8 +9,6 @@
type t
val supported_versions: P2p.version list
val create:
genesis:Store.genesis ->
store_root:string ->

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
module P2p = Netparams
open Logging.Node.Prevalidator
let preapply
@ -95,9 +97,7 @@ let create p2p net =
Lwt.return_unit in
let broadcast_operation ops =
P2p.broadcast
Messages.(to_frame @@ Operation_inventory (State.Net.id net, ops))
p2p in
P2p.broadcast p2p (Operation_inventory (State.Net.id net, ops)) in
let handle_unprocessed () =
if Operation_hash_set.is_empty !unprocessed then

View File

@ -26,6 +26,8 @@
*)
module P2p = Netparams
type t
(** Creation and destruction of a "prevalidation" worker. *)

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
module P2p = Netparams
open Logging.Node.Validator
type worker = {
@ -43,7 +45,7 @@ let test_validator w = w.test_validator ()
let fetch_block v = v.fetch_block
let prevalidator v = v.prevalidator
let broadcast w m = P2p.broadcast (Messages.to_frame m) w.p2p
let broadcast w m = P2p.broadcast w.p2p m
(** Current block computation *)

View File

@ -9,6 +9,8 @@
type worker
module P2p = Netparams
val create_worker: P2p.net -> State.t -> worker
val shutdown: worker -> unit Lwt.t

View File

@ -288,7 +288,6 @@ let init_node () =
{ incoming_port = Globals.incoming_port#get ;
discovery_port =
if Globals.discovery_port#get then Some 7732 else None ;
supported_versions = Node.supported_versions ;
known_peers = Globals.bootstrap_peers#get ;
peers_file = Globals.peers_file#get ;
closed_network = Globals.closed_network#get }