Shell: replace Netbits by Data_encoding

This commit is contained in:
Grégoire Henry 2016-11-15 01:33:12 +01:00
parent 5e26e1b9df
commit 450a0fec15
5 changed files with 28 additions and 292 deletions

View File

@ -210,7 +210,7 @@ clean::
############################################################################ ############################################################################
NODE_LIB_INTFS := \ NODE_LIB_INTFS := \
node/net/netbits.mli \ \
node/net/p2p.mli \ node/net/p2p.mli \
node/net/RPC.mli \ node/net/RPC.mli \
\ \
@ -237,9 +237,9 @@ NODE_LIB_INTFS := \
node/shell/node_rpc.mli \ node/shell/node_rpc.mli \
NODE_LIB_IMPLS := \ NODE_LIB_IMPLS := \
\
compiler/node_compiler_main.ml \ compiler/node_compiler_main.ml \
\ \
node/net/netbits.ml \
node/net/p2p.ml \ node/net/p2p.ml \
node/net/RPC.ml \ node/net/RPC.ml \
\ \

View File

@ -1,208 +0,0 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** The type of a single datum in a network frame. The encoding of a
datum is as follows: [[TYPE][CONTENTS]], where [[type]] is a
single byte whose value is [1] for [S], [2] for [I], [3] for [L],
[4] for B, [5] for [D], [6] for [F] and [7] for [C].
For [S]. [I], [L] and [D]¸ the raw values are stored using big
endianness. For [B], [F] and [C], the size is prefixed as a 16-bit,
big endian, unsigned integer
([[SIZE][BYTES]]). *)
type chunk =
| S of int (** A 16-bit integer *)
| I of int32 (** A 32-bit integer *)
| L of int64 (** A 64-bit integer *)
| B of MBytes.t (** A series of bytes *)
| D of float (** A 64-bits IEEE-754 floating point number *)
| F of frame (** An encapsulated subframe *)
| C of string (** A string *)
(** A network frame is a list of simple data. Its encoding on the
network is as follows: [[SIZE][DATA]] where [[SIZE]] is the raw
length of [[DATA]] in bytes as a big endian, 32-bit, unsigned
integer. *)
and frame =
chunk list
(** Pretty printing of frames for debugging *)
let rec print fmtr frame =
Format.fprintf fmtr "[@[<hv 2>" ;
let rec loop frame =
let sep = match frame with [ _ ] -> "" | _ -> " ;" in
match frame with
| [] -> ()
| e :: tl ->
begin match e with
| S i -> Format.fprintf fmtr "@ S %i%s@," i sep
| I i -> Format.fprintf fmtr "@ I %li%s@," i sep
| L i -> Format.fprintf fmtr "@ L %Li%s@," i sep
| D i -> Format.fprintf fmtr "@ D %g%s@," i sep
| B i -> Format.fprintf fmtr "@ B %S%s@," (MBytes.to_string i) sep
| F f -> Format.fprintf fmtr "@ F %a%s@," print f sep
| C s -> Format.fprintf fmtr "@ C %s%s@," s sep
end ;
loop tl
in loop frame ;
Format.fprintf fmtr "@ @]]%!"
(** Pretty prints of frames *)
let to_string frame =
let buf = Buffer.create 100 in
let fmtr = Format.formatter_of_buffer buf in
print fmtr frame ;
Buffer.contents buf
module BE = EndianBigstring.BigEndian
(** Encode a frame as raw bytes to send over the network *)
let to_raw frame =
let rec raw_size frame =
List.fold_left
(fun sz item -> sz + 1 + match item with
| S _ -> 2
| I _ -> 4
| L _ -> 8
| D _ -> 8
| F f -> raw_size f + 2
| B str -> MBytes.length str + 2
| C str -> String.length str + 2)
0 frame
in
let sz = raw_size frame in
let buf = MBytes.create (sz + 4) in
let rec store items offset = match items with
| S n :: tl ->
BE.set_int8 buf offset 0x01 ;
BE.set_int16 buf (offset + 1) n ;
store tl (offset + 1 + 2)
| I n :: tl ->
BE.set_int8 buf offset 0x02 ;
BE.set_int32 buf (offset + 1) n ;
store tl (offset + 1 + 4)
| L n :: tl ->
BE.set_int8 buf offset 0x03 ;
BE.set_int64 buf (offset + 1) n ;
store tl (offset + 1 + 8)
| B n :: tl ->
BE.set_int8 buf offset 0x04 ;
let len = MBytes.length n in
BE.set_int16 buf (offset + 1) len ;
MBytes.blit n 0 buf (offset + 1 + 2) len ;
store tl (offset + 1 + 2 + len)
| D n :: tl ->
BE.set_int8 buf offset 0x05 ;
BE.set_int64 buf (offset + 1) (Int64.bits_of_float n) ;
store tl (offset + 1 + 8)
| F f :: tl ->
BE.set_int8 buf offset 0x06 ;
let len = raw_size f in
BE.set_int16 buf (offset + 1) len ;
let offset = store f (offset + 1 + 2) in
store tl offset
| C n :: tl ->
BE.set_int8 buf offset 0x07 ;
let len = String.length n in
BE.set_int16 buf (offset + 1) len ;
MBytes.blit_from_string n 0 buf (offset + 1 + 2) len ;
store tl (offset + 1 + 2 + len)
| [] -> offset
in
BE.set_int32 buf 0 (Int32.of_int sz) ;
ignore (store frame 4) ;
buf
(** Decode a complete raw frame as read from the network *)
let of_raw buf =
let rec decode items offset stop =
let if_remains ofs sz cb = if ofs + sz <= stop then cb () else None in
if offset = stop then Some (List.rev items)
else if offset > stop then None
else
let tag = BE.get_int8 buf offset in
let offset = offset + 1 in
match tag with
| 0x01 ->
if_remains offset 2 @@ fun () ->
let items = S (BE.get_int16 buf offset) :: items in
decode items (offset + 2) stop
| 0x02 ->
if_remains offset 4 @@ fun () ->
let items = I (BE.get_int32 buf offset) :: items in
decode items (offset + 4) stop
| 0x03 ->
if_remains offset 8 @@ fun () ->
let items = L (BE.get_int64 buf offset) :: items in
decode items (offset + 8) stop
| 0x04 ->
if_remains offset 2 @@ fun () ->
let len = BE.get_uint16 buf offset in
let offset = offset + 2 in
if_remains offset len @@ fun () ->
let items = B (MBytes.sub buf offset len) :: items in
decode items (offset + len) stop
| 0x05 ->
if_remains offset 8 @@ fun () ->
let items = D (Int64.float_of_bits (BE.get_int64 buf offset)) :: items in
decode items (offset + 8) stop
| 0x06 ->
if_remains offset 2 @@ fun () ->
let len = BE.get_uint16 buf offset in
let offset = offset + 2 in
if_remains offset len @@ fun () ->
begin match decode [] offset (offset + len) with
| None -> None
| Some fitems -> decode ((F fitems) :: items) (offset + len) stop
end
| 0x07 ->
if_remains offset 2 @@ fun () ->
let len = BE.get_uint16 buf offset in
let offset = offset + 2 in
if_remains offset len @@ fun () ->
let items = C (MBytes.substring buf offset len) :: items in
decode items (offset + len) stop
| _ -> None
in
decode [] 4 (MBytes.length buf)
open Lwt
(** Write a frame from to file descriptor. *)
let write descr frame =
let buf = to_raw frame in
catch
(fun () ->
Lwt_bytes.write descr buf 0 (MBytes.length buf) >>= fun _ ->
return true)
(function
| Unix.Unix_error _ -> return false
| e -> fail e)
(** Read a frame from a file descriptor. *)
let read descr limit =
catch
(fun () ->
let szbuf = MBytes.create 4 in
Lwt_bytes.recv descr szbuf 0 4 [ Lwt_unix.MSG_PEEK ] >>= fun wsz ->
if wsz <> 4 then
return None
else
let len = Int32.to_int (BE.get_int32 szbuf 0) + 4 in
if len < 0 || len > limit then
return None
else
let buf = MBytes.create len in
Lwt_bytes.read descr buf 0 len >>= fun wsz ->
if wsz <> len then
return None
else
return (of_raw buf))
(function
| Unix.Unix_error (_err, _, _) -> return None
| e -> fail e)

View File

@ -1,61 +0,0 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** A simple, portable implementation of network frames. *)
(** The type of a single datum in a network frame. The encoding of a
datum is as follows: [[TYPE][CONTENTS]], where [[type]] is a
single byte whose value is [1] for [S], [2] for [I], [3] for [L],
[4] for B, [5] for [D], [6] for [F] and [7] for [C].
For [S]. [I], [L] and [D]¸ the raw values are stored using big
endianness. For [B], [F] and [C], the size is prefixed as a 16-bit,
big endian, unsigned integer
([[SIZE][BYTES]]). *)
type chunk =
| S of int (** A 16-bit integer *)
| I of int32 (** A 32-bit integer *)
| L of int64 (** A 64-bit integer *)
| B of MBytes.t (** A series of bytes *)
| D of float (** A 64-bits IEEE-754 floating point number *)
| F of frame (** An encapsulated subframe *)
| C of string (** A string *)
(** A network frame is a list of simple data. Its encoding on the
network is as follows: [[SIZE][DATA]] where [[SIZE]] is the raw
length of [[DATA]] in bytes as a big endian, 32-bit, unsigned
integer. *)
and frame =
chunk list
(** Writes a frame from to file descriptor Returns [true] if
successful, [false] if an error happened, which means that the
descriptor cannot accept any more data and should be closed. *)
val write : Lwt_unix.file_descr -> frame -> bool Lwt.t
(** Reads a frame from a file descriptor. Returns [Some frame] if
successful, [None] if an error happened, which means either that
that the descriptor cannot provide any more data or that corrupted
bytes have been received, and in any case says that the descriptor
should not be used anymore. The second parameter is the limit in
bytes of the underlying representation, including the size. [None]
is returned in case of overhead, and the bytes are not consumed
from the descriptor. *)
val read : Lwt_unix.file_descr -> int -> frame option Lwt.t
(** Pretty printing of frames for debugging *)
val print : Format.formatter -> frame -> unit
(** Pretty prints of frames *)
val to_string : frame -> string
(** Encode a frame as raw bytes to send over the network *)
val to_raw : frame -> MBytes.t
(** Decode a complete raw frame as read from the network *)
val of_raw : MBytes.t -> frame option

View File

@ -42,6 +42,11 @@ type config = {
closed_network : bool ; closed_network : bool ;
} }
(* The global net identificator. *)
type gid = string
let gid_length = 16
type 'msg msg_encoding = Encoding : { type 'msg msg_encoding = Encoding : {
tag: int ; tag: int ;
encoding: 'a Data_encoding.t ; encoding: 'a Data_encoding.t ;
@ -91,9 +96,6 @@ module type S = sig
(** A connection to a peer *) (** A connection to a peer *)
type peer type peer
(** A global identifier for a peer, a.k.a. an identity *)
type gid
(** Access the domain of active peers *) (** Access the domain of active peers *)
val peers : net -> peer list val peers : net -> peer list
@ -141,7 +143,6 @@ module Make (P: NET_PARAMS) = struct
module LC = Lwt_condition module LC = Lwt_condition
open Lwt open Lwt
open Lwt_utils open Lwt_utils
open Netbits
open Logging.Net open Logging.Net
let pp_gid ppf gid = let pp_gid ppf gid =
@ -160,9 +161,6 @@ module Make (P: NET_PARAMS) = struct
else find (la, tb) else find (la, tb)
in find (la, lb) in find (la, lb)
(* The global net identificator. *)
type gid = string
(* A net point (address x port). *) (* A net point (address x port). *)
type point = addr * port type point = addr * port
@ -668,16 +666,20 @@ module Make (P: NET_PARAMS) = struct
(* The (fixed size) broadcast frame. *) (* The (fixed size) broadcast frame. *)
let discovery_message_encoding =
let open Data_encoding in
tup3 (Fixed.string 8) (Fixed.string gid_length) int16
let discovery_message gid port = let discovery_message gid port =
Netbits.([ B (MBytes.of_string "DISCO") ; B (MBytes.of_string gid) ; S port ]) Data_encoding.Binary.to_bytes
discovery_message_encoding
("DISCOVER", gid, port)
(* Broadcast frame verifier. *) (* Broadcast frame verifier. *)
let answerable_discovery_message message my_gid when_ok when_not = let answerable_discovery_message msg my_gid when_ok when_not =
match message with match msg with
| Some [ B magic ; B gid ; S port ] -> | Some ("DISCOVER", gid, port) when gid <> my_gid ->
if MBytes.to_string magic = "DISCO" && MBytes.to_string gid <> my_gid then
when_ok gid port when_ok gid port
else when_not ()
| _ -> when_not () | _ -> when_not ()
let string_of_unix_exn = function let string_of_unix_exn = function
@ -703,7 +705,7 @@ module Make (P: NET_PARAMS) = struct
| Some main_socket -> | Some main_socket ->
(* the answering function *) (* the answering function *)
let rec step () = let rec step () =
let buffer = Netbits.to_raw (discovery_message my_gid 0) in let buffer = discovery_message my_gid 0 in
let len = MBytes.length buffer in let len = MBytes.length buffer in
pick [ (cancelation () >>= fun () -> return None) ; pick [ (cancelation () >>= fun () -> return None) ;
(Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r -> (Lwt_bytes.recvfrom main_socket buffer 0 len [] >>= fun r ->
@ -712,7 +714,10 @@ module Make (P: NET_PARAMS) = struct
if len' <> len then if len' <> len then
step () (* drop bytes, better luck next time ! *) step () (* drop bytes, better luck next time ! *)
else else
answerable_discovery_message (Netbits.of_raw buffer) my_gid answerable_discovery_message
(Data_encoding.Binary.of_bytes
discovery_message_encoding buffer)
my_gid
(fun _ port -> (fun _ port ->
catch catch
(fun () -> (fun () ->
@ -732,14 +737,14 @@ module Make (P: NET_PARAMS) = struct
(* Sends dicover messages into space in an exponentially delayed loop, (* Sends dicover messages into space in an exponentially delayed loop,
restartable using a condition *) restartable using a condition *)
let discovery_sender my_gid disco_port inco_port cancelation restart = let discovery_sender my_gid disco_port inco_port cancelation restart =
let message = discovery_message my_gid inco_port in let msg = discovery_message my_gid inco_port in
let rec loop delay n = let rec loop delay n =
catch catch
(fun () -> (fun () ->
let socket = LU.(socket PF_INET SOCK_DGRAM 0) in let socket = LU.(socket PF_INET SOCK_DGRAM 0) in
LU.setsockopt socket LU.SO_BROADCAST true ; LU.setsockopt socket LU.SO_BROADCAST true ;
LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () -> LU.connect socket LU.(ADDR_INET (Unix.inet_addr_of_string "255.255.255.255", disco_port)) >>= fun () ->
Netbits.(write socket message) >>= fun _ -> Lwt_utils.write_mbytes socket msg >>= fun _ ->
LU.close socket) LU.close socket)
(fun _ -> (fun _ ->
debug "(%a) error broadcasting a discovery request" pp_gid my_gid ; debug "(%a) error broadcasting a discovery request" pp_gid my_gid ;

View File

@ -53,6 +53,9 @@ type limits = {
blacklist_time : float ; blacklist_time : float ;
} }
(** A global identifier for a peer, a.k.a. an identity *)
type gid
type 'msg msg_encoding = Encoding : { type 'msg msg_encoding = Encoding : {
tag: int ; tag: int ;
encoding: 'a Data_encoding.t ; encoding: 'a Data_encoding.t ;
@ -100,9 +103,6 @@ module Make (P : NET_PARAMS) : sig
(** A connection to a peer *) (** A connection to a peer *)
type peer type peer
(** A global identifier for a peer, a.k.a. an identity *)
type gid
(** Access the domain of active peers *) (** Access the domain of active peers *)
val peers : net -> peer list val peers : net -> peer list