diff --git a/src/lib_data_encoding/binary.ml b/src/lib_data_encoding/binary.ml index ed37f1f70..d2bc18fe0 100644 --- a/src/lib_data_encoding/binary.ml +++ b/src/lib_data_encoding/binary.ml @@ -20,10 +20,6 @@ type 'l writer = { write: 'a. 'a Encoding.t -> 'a -> MBytes.t -> int -> int ; } -type 'l reader = { - read: 'a. 'a Encoding.t -> MBytes.t -> int -> int -> (int * 'a) ; -} - let rec length : type x. x Encoding.t -> x -> int = fun e -> let open Encoding in match e.encoding with @@ -546,264 +542,6 @@ let to_bytes t v = write_rec_buffer t v bytes ; MBytes_buffer.to_mbytes bytes -(** Reader *) - -module Reader = struct - - let int8 buf ofs _len = - ofs + Binary_size.int8, MBytes.get_int8 buf ofs - - let uint8 buf ofs _len = - ofs + Binary_size.uint8, MBytes.get_uint8 buf ofs - - let char buf ofs _len = - ofs + Binary_size.char, MBytes.get_char buf ofs - - let bool buf ofs len = - let ofs, v = int8 buf ofs len in - ofs, v <> 0 - - let int16 buf ofs _len = - ofs + Binary_size.int16, MBytes.get_int16 buf ofs - - let uint16 buf ofs _len = - ofs + Binary_size.uint16, MBytes.get_uint16 buf ofs - - let uint30 buf ofs _len = - let v = Int32.to_int (MBytes.get_int32 buf ofs) in - if v < 0 then - failwith "Data_encoding.Binary.Reader.uint30: invalid data." ; - ofs + Binary_size.uint30, v - - let int31 buf ofs _len = - ofs + Binary_size.int31, Int32.to_int (MBytes.get_int32 buf ofs) - - let int32 buf ofs _len = - ofs + Binary_size.int32, MBytes.get_int32 buf ofs - - let int64 buf ofs _len = - ofs + Binary_size.int64, MBytes.get_int64 buf ofs - - let z buf ofs _len = - let res = Buffer.create 100 in - let rec read prev i value bit = - if prev land 0x80 = 0x00 then begin - if bit > 0 then Buffer.add_char res (Char.unsafe_chr value) ; - if prev = 0x00 then failwith "trailing zeroes in Z encoding" ; - i - end else - let byte = MBytes.get_uint8 buf (ofs + i) in - let value = value lor ((byte land 0x7F) lsl bit) in - let bit = bit + 7 in - let bit, value = if bit >= 8 then begin - Buffer.add_char res (Char.unsafe_chr (value land 0xFF)) ; - bit - 8, value lsr 8 - end else bit, value in - read byte (i + 1) value bit in - let first = MBytes.get_uint8 buf ofs in - if first = 0 then - ofs + 1, Z.zero - else - let value = first land 0x3F in - let sign = (first land 0x40) <> 0 in - let length = read first 1 value 6 in - let bits = Buffer.contents res in - let res = Z.of_bits bits in - let res = if sign then Z.neg res else res in - ofs + length, res - - (** read a float64 (double) **) - let float buf ofs _len = - (*Here, float means float64, which is read using MBytes.get_double !!*) - ofs + Binary_size.float, MBytes.get_double buf ofs - - let int_of_int32 i = - let i' = Int32.to_int i in - let i'' = Int32.of_int i' in - if i'' = i then - i' - else - invalid_arg "int_of_int32 overflow" - - let fixed_length_bytes length buf ofs _len = - let s = MBytes.sub buf ofs length in - ofs + length, s - - let fixed_length_string length buf ofs _len = - let s = MBytes.sub_string buf ofs length in - ofs + length, s - - let seq r1 r2 buf ofs len = - let ofs', v1 = r1 buf ofs len in - let ofs'', v2 = r2 buf ofs' (len - (ofs' - ofs)) in - ofs'', (v1, v2) - - let varseq r e1 e2 buf ofs len = - let k1 = Encoding.classify e1 - and k2 = Encoding.classify e2 in - match k1, k2 with - | (`Dynamic | `Fixed _), `Variable -> - let ofs', v1 = r.read e1 buf ofs len in - let ofs'', v2 = r.read e2 buf ofs' (len - (ofs' - ofs)) in - ofs'', (v1, v2) - | `Variable, `Fixed n -> - let ofs', v1 = r.read e1 buf ofs (len - n) in - let ofs'', v2 = r.read e2 buf ofs' n in - ofs'', (v1, v2) - | _ -> assert false (* Should be rejected by Kind.combine *) - - let list read buf ofs len = - let rec loop acc ofs len = - assert (len >= 0); - if len <= 0 - then ofs, List.rev acc - else - let ofs', v = read buf ofs len in - assert (ofs' > ofs); - loop (v :: acc) ofs' (len - (ofs' - ofs)) - in - loop [] ofs len - - let array read buf ofs len = - let ofs, l = list read buf ofs len in - ofs, Array.of_list l - - let conv inj r buf ofs len = - let ofs, v = r buf ofs len in - ofs, inj v - - let read_tag = function - | `Uint8 -> uint8 - | `Uint16 -> uint16 - - let union r sz cases = - let open Encoding in - let read_cases = - TzList.filter_map - (function - | (Case { tag = Json_only }) -> None - | (Case { encoding = e ; inj ; tag = Tag tag }) -> - let read = r.read e in - Some (tag, fun len buf ofs -> - let ofs, v = read len buf ofs in - ofs, inj v)) - cases in - fun buf ofs len -> - let ofs, tag = read_tag sz buf ofs len in - try List.assoc tag read_cases buf ofs (len - Binary_size.tag_size sz) - with Not_found -> raise (Unexpected_tag tag) - -end - -let rec read_rec : type a. a Encoding.t-> MBytes.t -> int -> int -> int * a = fun e -> - let open Encoding in - let open Reader in - match e.encoding with - | Null -> (fun _buf ofs _len -> ofs, ()) - | Empty -> (fun _buf ofs _len -> ofs, ()) - | Constant _ -> (fun _buf ofs _len -> ofs, ()) - | Ignore -> (fun _buf ofs len -> ofs + len, ()) - | Bool -> bool - | Int8 -> int8 - | Uint8 -> uint8 - | Int16 -> int16 - | Uint16 -> uint16 - | Int31 -> int31 - | Int32 -> int32 - | Int64 -> int64 - | Z -> z - | RangedInt { minimum ; maximum } -> - (fun buf ofs alpha -> - let ofs, value = - match Binary_size.range_to_size ~minimum ~maximum with - | `Int8 -> int8 buf ofs alpha - | `Int16 -> int16 buf ofs alpha - | `Int31 -> int31 buf ofs alpha - | `Uint8 -> uint8 buf ofs alpha - | `Uint16 -> uint16 buf ofs alpha - | `Uint30 -> uint30 buf ofs alpha in - let value = if minimum > 0 then value + minimum else value in - if value < minimum || value > maximum - then raise (Int_out_of_range (value, minimum, maximum)) ; - (ofs, value)) - | Float -> float - | RangedFloat { minimum ; maximum } -> - (fun buf ofs len -> - let offset, value = float buf ofs len in - if value < minimum || value > maximum - then raise (Float_out_of_range (value, minimum, maximum)) ; - (offset, value)) - | Bytes (`Fixed n) -> fixed_length_bytes n - | String (`Fixed n) -> fixed_length_string n - | Bytes `Variable -> fun buf ofs len -> fixed_length_bytes len buf ofs len - | String `Variable -> fun buf ofs len -> fixed_length_string len buf ofs len - | String_enum (_, arr) -> begin - fun buf ofs a -> - let ofs, ind = - match Binary_size.enum_size arr with - | `Uint8 -> uint8 buf ofs a - | `Uint16 -> uint16 buf ofs a - | `Uint30 -> uint30 buf ofs a in - if ind >= Array.length arr - then raise No_case_matched - else (ofs, arr.(ind)) - end - | Array e -> array (read_rec e) - | List e -> list (read_rec e) - | Obj (Req (_, e)) -> read_rec e - | Obj (Opt (`Dynamic, _, t)) -> - let read = read_rec t in - (fun buf ofs len -> - let ofs, v = int8 buf ofs len in - if v = 0 then ofs, None - else let ofs, v = read buf ofs (len - Binary_size.int8) in ofs, Some v) - | Obj (Opt (`Variable, _, t)) -> - let read = read_rec t in - (fun buf ofs len -> - if len = 0 then ofs, None - else - let ofs', v = read buf ofs len in - assert (ofs' = ofs + len) ; - ofs + len, Some v) - | Obj (Dft (_, e, _)) -> read_rec e - | Objs ((`Fixed _ | `Dynamic), e1, e2) -> - seq (read_rec e1) (read_rec e2) - | Objs (`Variable, e1, e2) -> - varseq { read = fun t -> read_rec t } e1 e2 - | Tup e -> read_rec e - | Tups ((`Fixed _ | `Dynamic), e1, e2) -> - seq (read_rec e1) (read_rec e2) - | Tups (`Variable, e1, e2) -> - varseq { read = fun t -> read_rec t } e1 e2 - | Conv { inj ; encoding = e } -> conv inj (read_rec e) - | Describe { encoding = e } -> read_rec e - | Def { encoding = e } -> read_rec e - | Splitted { encoding = e } -> read_rec e - | Union (_, sz, cases) -> - union { read = fun t -> read_rec t } sz cases - | Mu (_, _, self) -> fun buf ofs len -> read_rec (self e) buf ofs len - | Dynamic_size e -> - let read = read_rec e in - fun buf ofs len -> - let ofs, sz = int32 buf ofs len in - let sz = Int32.to_int sz in - if sz < 0 then raise (Invalid_size sz); - read buf ofs sz - | Delayed f -> read_rec (f ()) - -let read t buf ofs len = - try Some (read_rec t buf ofs len) - with _ -> None -let write = write -let of_bytes_exn ty buf = - let len = MBytes.length buf in - let read_len, r = read_rec ty buf 0 len in - if read_len <> len then - failwith "Data_encoding.Binary.of_bytes_exn: remainig data" ; - r -let of_bytes ty buf = - try Some (of_bytes_exn ty buf) - with _ -> None let to_bytes = to_bytes let length = length diff --git a/src/lib_data_encoding/binary.mli b/src/lib_data_encoding/binary.mli index ab4c0cb68..7c1a5b8d0 100644 --- a/src/lib_data_encoding/binary.mli +++ b/src/lib_data_encoding/binary.mli @@ -11,10 +11,7 @@ use the corresponding module intended for use: {Data_encoding.Binary}. *) val length : 'a Encoding.t -> 'a -> int -val read : 'a Encoding.t -> MBytes.t -> int -> int -> (int * 'a) option val write : 'a Encoding.t -> 'a -> MBytes.t -> int -> int option val to_bytes : 'a Encoding.t -> 'a -> MBytes.t -val of_bytes : 'a Encoding.t -> MBytes.t -> 'a option -val of_bytes_exn : 'a Encoding.t -> MBytes.t -> 'a val fixed_length : 'a Encoding.t -> int option val fixed_length_exn : 'a Encoding.t -> int diff --git a/src/lib_data_encoding/binary_error.ml b/src/lib_data_encoding/binary_error.ml new file mode 100644 index 000000000..85509ed56 --- /dev/null +++ b/src/lib_data_encoding/binary_error.ml @@ -0,0 +1,38 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type read_error = + | Not_enough_data + | Extra_bytes + | No_case_matched + | Unexpected_tag of int + | Invalid_size of int + | Invalid_int of { min : int ; v : int ; max : int } + | Invalid_float of { min : float ; v : float ; max : float } + | Trailing_zero + +let pp_read_error ppf = function + | Not_enough_data -> + Format.fprintf ppf "Not enough data" + | Extra_bytes -> + Format.fprintf ppf "Extra bytes" + | No_case_matched -> + Format.fprintf ppf "No case matched" + | Unexpected_tag tag -> + Format.fprintf ppf "Unexpected tag %d" tag + | Invalid_size sz -> + Format.fprintf ppf "Invalid size %d" sz + | Invalid_int { min ; v ; max} -> + Format.fprintf ppf "Invalid int (%d <= %d <= %d) " min v max + | Invalid_float { min ; v ; max} -> + Format.fprintf ppf "Invalid float (%f <= %f <= %f) " min v max + | Trailing_zero -> + Format.fprintf ppf "Trailing zero in Z" + +exception Read_error of read_error diff --git a/src/lib_data_encoding/binary_error.mli b/src/lib_data_encoding/binary_error.mli new file mode 100644 index 000000000..69498bfe1 --- /dev/null +++ b/src/lib_data_encoding/binary_error.mli @@ -0,0 +1,23 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** This is for use *within* the data encoding library only. Instead, you should + use the corresponding module intended for use: {Data_encoding.Binary}. *) + +type read_error = + | Not_enough_data + | Extra_bytes + | No_case_matched + | Unexpected_tag of int + | Invalid_size of int + | Invalid_int of { min : int ; v : int ; max : int } + | Invalid_float of { min : float ; v : float ; max : float } + | Trailing_zero +exception Read_error of read_error +val pp_read_error: Format.formatter -> read_error -> unit diff --git a/src/lib_data_encoding/binary_reader.ml b/src/lib_data_encoding/binary_reader.ml new file mode 100644 index 000000000..bd2936d98 --- /dev/null +++ b/src/lib_data_encoding/binary_reader.ml @@ -0,0 +1,285 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Binary_error + +let raise e = raise (Read_error e) + +type state = { + buffer : MBytes.t ; + mutable offset : int ; + mutable remaining_bytes : int ; +} + +let check_remaining_bytes state size = + if state.remaining_bytes < size then + raise Not_enough_data ; + state.remaining_bytes - size + +let read_atom size conv state = + let remaining_bytes = check_remaining_bytes state size in + let res = conv state.buffer state.offset in + state.offset <- state.offset + size ; + state.remaining_bytes <- remaining_bytes ; + res + +(** Reader for all the atomic types. *) +module Atom = struct + + let uint8 = read_atom Binary_size.uint8 MBytes.get_uint8 + let uint16 = read_atom Binary_size.int16 MBytes.get_uint16 + + let int8 = read_atom Binary_size.int8 MBytes.get_int8 + let int16 = read_atom Binary_size.int16 MBytes.get_int16 + let int32 = read_atom Binary_size.int32 MBytes.get_int32 + let int64 = read_atom Binary_size.int64 MBytes.get_int64 + + let float = read_atom Binary_size.float MBytes.get_double + + let bool state = int8 state <> 0 + + let uint30 = + read_atom Binary_size.uint30 @@ fun buffer ofs -> + let v = Int32.to_int (MBytes.get_int32 buffer ofs) in + if v < 0 then + raise (Invalid_int { min = 0 ; v ; max = (1 lsl 30) - 1 }) ; + v + + let int31 = + read_atom Binary_size.int31 @@ fun buffer ofs -> + Int32.to_int (MBytes.get_int32 buffer ofs) + + let ranged_int ~minimum ~maximum state = + let read_int = + match Binary_size.range_to_size ~minimum ~maximum with + | `Int8 -> int8 + | `Int16 -> int16 + | `Int31 -> int31 + | `Uint8 -> uint8 + | `Uint16 -> uint16 + | `Uint30 -> uint30 in + let ranged = read_int state in + let ranged = if minimum > 0 then ranged + minimum else ranged in + if not (minimum <= ranged && ranged <= maximum) then + raise (Invalid_int { min = minimum ; v =ranged ; max = maximum }) ; + ranged + + let ranged_float ~minimum ~maximum state = + let ranged = float state in + if not (minimum <= ranged && ranged <= maximum) then + raise (Invalid_float { min = minimum ; v = ranged ; max = maximum }) ; + ranged + + let z state = + let res = Buffer.create 100 in + let first = uint8 state in + if first = 0 then + Z.zero + else + let first_value = first land 0x3F in + let sign = (first land 0x40) <> 0 in + let rec read prev value bit state = + if prev land 0x80 = 0x00 then begin + if bit > 0 then Buffer.add_char res (Char.unsafe_chr value) ; + if prev = 0x00 then raise Trailing_zero ; + let bits = Buffer.contents res in + let res = Z.of_bits bits in + if sign then Z.neg res else res + end else + let byte = uint8 state in + let value = value lor ((byte land 0x7F) lsl bit) in + let bit = bit + 7 in + let bit, value = + if bit >= 8 then begin + Buffer.add_char res (Char.unsafe_chr (value land 0xFF)) ; + bit - 8, value lsr 8 + end else + bit, value in + read byte value bit state in + read first first_value 6 state + + let string_enum arr state = + let read_index = + match Binary_size.enum_size arr with + | `Uint8 -> uint8 + | `Uint16 -> uint16 + | `Uint30 -> uint30 in + let index = read_index state in + if index >= Array.length arr then + raise No_case_matched ; + arr.(index) + + let fixed_length_bytes length = + read_atom length @@ fun buf ofs -> + MBytes.sub buf ofs length + + let fixed_length_string length = + read_atom length @@ fun buf ofs -> + MBytes.sub_string buf ofs length + + let tag = function + | `Uint8 -> uint8 + | `Uint16 -> uint16 + +end + +(** Main recursive reading function, in continuation passing style. *) +let rec read_rec : type ret. ret Encoding.t -> state -> ret + = fun e state -> + let open Encoding in + match e.encoding with + | Null -> () + | Empty -> () + | Constant _ -> () + | Ignore -> () + | Bool -> Atom.bool state + | Int8 -> Atom.int8 state + | Uint8 -> Atom.uint8 state + | Int16 -> Atom.int16 state + | Uint16 -> Atom.uint16 state + | Int31 -> Atom.int31 state + | Int32 -> Atom.int32 state + | Int64 -> Atom.int64 state + | Z -> Atom.z state + | Float -> Atom.float state + | Bytes (`Fixed n) -> Atom.fixed_length_bytes n state + | Bytes `Variable -> + Atom.fixed_length_bytes state.remaining_bytes state + | String (`Fixed n) -> Atom.fixed_length_string n state + | String `Variable -> + Atom.fixed_length_string state.remaining_bytes state + | RangedInt { minimum ; maximum } -> + Atom.ranged_int ~minimum ~maximum state + | RangedFloat { minimum ; maximum } -> + Atom.ranged_float ~minimum ~maximum state + | String_enum (_, arr) -> + Atom.string_enum arr state + | Array e -> + let l = read_list e state in + Array.of_list l + | List e -> read_list e state + | (Obj (Req (_, e))) -> read_rec e state + | (Obj (Dft (_, e, _))) -> read_rec e state + | (Obj (Opt (`Dynamic, _, e))) -> + let present = Atom.bool state in + if not present then + None + else + Some (read_rec e state) + | (Obj (Opt (`Variable, _, e))) -> + if state.remaining_bytes = 0 then + None + else + Some (read_rec e state) + | Objs (`Fixed sz, e1, e2) -> + ignore (check_remaining_bytes state sz : int) ; + let left = read_rec e1 state in + let right = read_rec e2 state in + (left, right) + | Objs (`Dynamic, e1, e2) -> + let left = read_rec e1 state in + let right = read_rec e2 state in + (left, right) + | (Objs (`Variable, e1, e2)) -> + read_variable_pair e1 e2 state + | Tup e -> read_rec e state + | Tups (`Fixed sz, e1, e2) -> + ignore (check_remaining_bytes state sz : int) ; + let left = read_rec e1 state in + let right = read_rec e2 state in + (left, right) + | Tups (`Dynamic, e1, e2) -> + let left = read_rec e1 state in + let right = read_rec e2 state in + (left, right) + | (Tups (`Variable, e1, e2)) -> + read_variable_pair e1 e2 state + | Conv { inj ; encoding } -> + inj (read_rec encoding state) + | Union (_, sz, cases) -> + let ctag = Atom.tag sz state in + let Case { encoding ; inj } = + try + List.find + (function + | Case { tag = Tag tag } -> tag = ctag + | Case { tag = Json_only } -> false) + cases + with Not_found -> raise (Unexpected_tag ctag) in + inj (read_rec encoding state) + | Dynamic_size e -> + let sz = Atom.int32 state in + let sz = Int32.to_int sz in + if sz < 0 then raise (Invalid_size sz) ; + let remaining = check_remaining_bytes state sz in + state.remaining_bytes <- sz ; + let v = read_rec e state in + if state.remaining_bytes <> 0 then raise Extra_bytes ; + state.remaining_bytes <- remaining ; + v + | Describe { encoding = e } -> read_rec e state + | Def { encoding = e } -> read_rec e state + | Splitted { encoding = e } -> read_rec e state + | Mu (_, _, self) -> read_rec (self e) state + | Delayed f -> read_rec (f ()) state + + +and read_variable_pair + : type left right. + left Encoding.t -> right Encoding.t -> state -> (left * right) + = fun e1 e2 state -> + match Encoding.classify e1, Encoding.classify e2 with + | (`Dynamic | `Fixed _), `Variable -> + let left = read_rec e1 state in + let right = read_rec e2 state in + (left, right) + | `Variable, `Fixed n -> + if n > state.remaining_bytes then raise Not_enough_data ; + state.remaining_bytes <- state.remaining_bytes - n ; + let left = read_rec e1 state in + assert (state.remaining_bytes = 0) ; + state.remaining_bytes <- n ; + let right = read_rec e2 state in + assert (state.remaining_bytes = 0) ; + (left, right) + | _ -> assert false (* Should be rejected by [Encoding.Kind.combine] *) + +and read_list : type a. a Encoding.t -> state -> a list + = fun e state -> + let rec loop acc = + if state.remaining_bytes = 0 then + List.rev acc + else + let v = read_rec e state in + loop (v :: acc) in + loop [] + + + +(** ******************** *) +(** Various entry points *) + +let read encoding buffer ofs len = + let state = + { buffer ; offset = ofs ; remaining_bytes = len } in + match read_rec encoding state with + | exception Read_error _ -> None + | v -> Some (state.offset, v) + +let of_bytes_exn encoding buffer = + let len = MBytes.length buffer in + let state = + { buffer ; offset = 0 ; remaining_bytes = len } in + let v = read_rec encoding state in + if state.offset <> len then raise Extra_bytes ; + v + +let of_bytes encoding buffer = + try Some (of_bytes_exn encoding buffer) + with Read_error _ -> None diff --git a/src/lib_data_encoding/binary_reader.mli b/src/lib_data_encoding/binary_reader.mli new file mode 100644 index 000000000..1b6622d60 --- /dev/null +++ b/src/lib_data_encoding/binary_reader.mli @@ -0,0 +1,15 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** This is for use *within* the data encoding library only. Instead, you should + use the corresponding module intended for use: {Data_encoding.Binary}. *) + +val read: 'a Encoding.t -> MBytes.t -> int -> int -> (int * 'a) option +val of_bytes: 'a Encoding.t -> MBytes.t -> 'a option +val of_bytes_exn: 'a Encoding.t -> MBytes.t -> 'a diff --git a/src/lib_data_encoding/binary_size.ml b/src/lib_data_encoding/binary_size.ml index 0109ebd40..2e8c047a0 100644 --- a/src/lib_data_encoding/binary_size.ml +++ b/src/lib_data_encoding/binary_size.ml @@ -27,7 +27,6 @@ let tag_size = function | `Uint8 -> uint8 | `Uint16 -> uint16 - type signed_integer = [ `Int31 | `Int16 | `Int8 ] type unsigned_integer = [ `Uint30 | `Uint16 | `Uint8 ] type integer = [ signed_integer | unsigned_integer ] @@ -62,4 +61,3 @@ let range_to_size ~minimum ~maximum : integer = let enum_size arr = unsigned_range_to_size (Array.length arr) - diff --git a/src/lib_data_encoding/binary_stream.ml b/src/lib_data_encoding/binary_stream.ml deleted file mode 100644 index 9a807db67..000000000 --- a/src/lib_data_encoding/binary_stream.ml +++ /dev/null @@ -1,470 +0,0 @@ - -(* Facilities to decode streams of binary data *) - -type 'a status = - | Success of { res : 'a ; res_len : int ; remaining : MBytes.t list } - | Await of (MBytes.t -> 'a status) - | Error - -(* used as a zipper to code the function read_checker with the - ability to stop and wait for more data. In 'P_seq' case, data - length is parameterized by the current offset. Hence, it's a - function 'fun_data_len'. For the 'P_list' case, we store the - base offset (before starting reading the elements) and the - number of elements that have been read so far. *) -type path = - | P_top : path - | P_await : { path : path ; encoding : 'a Encoding.t ; data_len : int } -> path - | P_seq : { path : path ; encoding : 'a Encoding.t ; - fun_data_len : int -> int } -> path - | P_list : { path:path ; encoding:'a Encoding.t ; data_len : int ; - base_ofs : int ; nb_elts_read : int } -> path - -(* used to accumulate given mbytes when reading a list of blocks, - as well as the current offset and the number of unread bytes *) -type mbytes_stream = { - past : MBytes.t Queue.t ; (* data that have been entirely read *) - future : (MBytes.t * int) Queue.t ; (* data that are not (fully) read *) - mutable past_len : int ; (*length of concatenation of data in 'past'*) - mutable unread : int ; (*number of cells that are unread in 'future'*) - ofs : int (*current absolute offset wrt to concatenation past @ future*) -} - -(* exception raised when additional mbytes are needed to continue - decoding *) -exception Need_more_data of mbytes_stream - -(* read a data that is stored in may Mbytes *) -let read_from_many_blocks reader buf ofs d_ofs = - let tmp = MBytes.create d_ofs in (*we will merge data in this mbyte*) - let r = ref d_ofs in (*to count the cells to be read*) - let rel_ofs = ref ofs in (*= ofs for first mbyte, 0 for others*) - while !r > 0 do - assert (not (Queue.is_empty buf.future)) ; - let b, len_b = Queue.peek buf.future in (*take the next mbyte*) - let len_chunk = len_b - !rel_ofs in (*the number of cells to read*) - if !r >= len_chunk then - begin (*copy b in 'past' if it is read entirely*) - ignore (Queue.pop buf.future) ; - Queue.push b buf.past ; - buf.past_len <- buf.past_len + len_b ; - end ; - (* copy (min !r len_chunk) data from b to tmp *) - MBytes.blit b !rel_ofs tmp (d_ofs - !r) (min !r len_chunk) ; - r := !r - len_chunk ; (* len_chunk data read during this round*) - rel_ofs := 0 ; (*next mbytes will be read starting from zero*) - done ; - reader tmp 0 d_ofs - - -(* generic function that reads data from an mbytes_stream. It is - parameterized by a function "reader" that effectively reads the - data *) -let generic_read_data delta_ofs reader buf = - let absolute_ofs = buf.ofs in - if buf.unread < delta_ofs then (*not enough data*) - raise (Need_more_data buf) ; - if delta_ofs = 0 then (*we'll read nothing*) - buf, reader (MBytes.create 0) 0 0 - else - let new_ofs = absolute_ofs + delta_ofs in - let ofs = absolute_ofs - buf.past_len in (*relative ofs wrt 'future'*) - buf.unread <- buf.unread-delta_ofs ; (*'delta_ofs' cells will be read*) - assert (not (Queue.is_empty buf.future)) ; (*we have some data to read*) - let b, len_b = Queue.peek buf.future in - let buf = { buf with ofs = new_ofs } in - if ofs + delta_ofs > len_b then - (*should read data from many mbytes*) - buf, read_from_many_blocks reader buf ofs delta_ofs - else - begin - if ofs + delta_ofs = len_b then - begin (*the rest of b will be entirely read. Put it in 'past'*) - ignore (Queue.pop buf.future) ; - Queue.push b buf.past ; - buf.past_len <- buf.past_len + len_b ; - end ; - buf, reader b ofs delta_ofs - end - -open Encoding (* open here, shadow below, use shadowed definitions later *) - -(* functions that try to read data from a given mbytes_stream, - or raise Need_more_data *) - -let int8 buf = - generic_read_data Binary_size.int8 (fun x y _ -> MBytes.get_int8 x y) buf - -let uint8 buf = - generic_read_data Binary_size.uint8 (fun x y _ -> MBytes.get_uint8 x y) buf - -let char buf = - let buf, v = int8 buf in - buf, Char.chr v - -let bool buf = - let buf, v = int8 buf in - buf, v <> 0 - -let int16 buf = - generic_read_data Binary_size.int16 (fun x y _ -> MBytes.get_int16 x y) buf - -let uint16 buf = - generic_read_data Binary_size.uint16 (fun x y _ -> MBytes.get_uint16 x y) buf - -let uint30 buf = - generic_read_data Binary_size.uint30 - (fun x y _ -> - let v = Int32.to_int (MBytes.get_int32 x y) in - if v < 0 then - failwith "Data_encoding.Binary.Reader.uint30: invalid data." ; - v) buf - -let int31 buf = - generic_read_data Binary_size.int31 - (fun x y _ -> Int32.to_int (MBytes.get_int32 x y)) buf - -let int32 buf = - generic_read_data Binary_size.int32 (fun x y _ -> MBytes.get_int32 x y) buf - -let int64 buf = - generic_read_data Binary_size.int64 (fun x y _ -> MBytes.get_int64 x y) buf - -(** read a float64 (double) **) -let float buf = - (*Here, float means float64, which is read using MBytes.get_double !!*) - generic_read_data Binary_size.float (fun x y _ -> MBytes.get_double x y) buf - -let fixed_length_bytes length buf = - generic_read_data length MBytes.sub buf - -let fixed_length_string length buf = - generic_read_data length MBytes.sub_string buf - -let read_tag = function - | `Uint8 -> uint8 - | `Uint16 -> uint16 - -(* auxiliary function: computing size of data in branches - Objs(`Variable) and Tups(`Variable) *) -let varseq_lengths e1 e2 ofs len = match Encoding.classify e1, Encoding.classify e2 with - | (`Dynamic | `Fixed _), `Variable -> len, (fun ofs' -> len - ofs' + ofs) - | `Variable, `Fixed n -> (len - n), (fun _ -> n) - | _ -> assert false (* Should be rejected by Kind.combine *) - - -(* adaptation of function read_rec to check binary data - incrementally. The function takes (and returns) a 'path' (for - incrementality), and 'mbytes_stream' *) -let rec data_checker - : type a. - path -> a Encoding.t -> mbytes_stream -> int -> - path * mbytes_stream = - fun path e buf len -> - (*length of data with `Variable kind should be given by the caller*) - assert (Encoding.classify e != `Variable || len >= 0) ; - try match e.encoding with - | Null -> next_path path buf - | Empty -> next_path path buf - | Constant _ -> next_path path buf - | Ignore -> next_path path { buf with ofs = buf.ofs + len } - | Bool -> next_path path (fst (bool buf)) - | Int8 -> next_path path (fst (int8 buf)) - | Uint8 -> next_path path (fst (uint8 buf)) - | Int16 -> next_path path (fst (int16 buf)) - | Uint16 -> next_path path (fst (uint16 buf)) - | Int31 -> next_path path (fst (int31 buf)) - | Int32 -> next_path path (fst (int32 buf)) - | Int64 -> next_path path (fst (int64 buf)) - | Z -> - let rec while_not_terminator i buf = - let buf, byte = uint8 buf in - if (byte land 0x80) = 0x00 then - if byte = 0x00 && i <> 0 then - failwith "trailing zeroes in Z encoding" - else - next_path path buf - else - while_not_terminator (i + 1) buf in - while_not_terminator 0 buf - | RangedInt { minimum ; maximum } -> - let (stream, ranged) = - match Binary_size.range_to_size ~minimum ~maximum with - | `Int8 -> int8 buf - | `Int16 -> int16 buf - | `Int31 -> int31 buf - | `Uint8 -> uint8 buf - | `Uint16 -> uint16 buf - | `Uint30 -> uint30 buf in - let ranged = if minimum > 0 then ranged + minimum else ranged in - assert (minimum <= ranged && ranged <= maximum) ; - next_path path stream - | Float -> next_path path (fst (float buf)) - | RangedFloat { minimum ; maximum } -> - let stream, float = float buf in - assert (minimum <= float && maximum >= float) ; - next_path path stream - | Bytes (`Fixed n) -> - next_path path (fst (fixed_length_bytes n buf)) - - | String (`Fixed n) -> - next_path path (fst (fixed_length_string n buf)) - - | Bytes `Variable -> - next_path path (fst (fixed_length_bytes len buf)) - - | String `Variable -> - next_path path (fst (fixed_length_string len buf)) - - | String_enum (_, arr) -> - next_path path - (match Binary_size.enum_size arr with - | `Uint8 -> fst @@ uint8 buf - | `Uint16 -> fst @@ uint16 buf - | `Uint30 -> fst @@ uint30 buf) - - | Array e -> - let p = P_list { path ; encoding = e ; base_ofs = buf.ofs ; - data_len = len ; nb_elts_read = 0 } in - next_path p buf - - | List e -> - let p = P_list { path ; encoding = e ; base_ofs = buf.ofs ; - data_len = len ; nb_elts_read = 0 } in - next_path p buf - - | Obj (Req (_, e)) -> data_checker path e buf len - - | Obj (Opt (`Dynamic, _, e)) -> - let buf, v = int8 buf in - if v = 0 then next_path path buf - else data_checker path e buf (len - Binary_size.int8) - - | Obj (Opt (`Variable, _, e)) -> - if len = 0 then next_path path buf - else data_checker path e buf len - - | Obj (Dft (_, e, _)) -> data_checker path e buf len - - | Objs ((`Fixed _ | `Dynamic), e1, e2) -> - let f_len2 ofs' = len - (ofs' - buf.ofs) in - let path = - P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in - data_checker path e1 buf len - - | Objs (`Variable, e1, e2) -> - let len1, f_len2 = varseq_lengths e1 e2 buf.ofs len in - let path = - P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in - data_checker path e1 buf len1 - - | Tup e -> data_checker path e buf len - - | Tups ((`Fixed _ | `Dynamic), e1, e2) -> - let f_len2 ofs' = len - (ofs' - buf.ofs) in - let path = - P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in - data_checker path e1 buf len - - | Tups (`Variable, e1, e2) -> - let len1, f_len2 = varseq_lengths e1 e2 buf.ofs len in - let path = - P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in - data_checker path e1 buf len1 - - | Conv { encoding = e } -> data_checker path e buf len - - | Describe { encoding = e } -> data_checker path e buf len - - | Def { encoding = e } -> data_checker path e buf len - - | Splitted { encoding = e } -> data_checker path e buf len - - | Mu (_, _, self) -> data_checker path (self e) buf len - - | Union (_, sz, cases) -> - let buf, ctag = read_tag sz buf in - let opt = - List.fold_left - (fun acc c -> match c with - | (Case { encoding ; tag = Tag tag }) - when tag == ctag -> - assert (acc == None) ; - Some (data_checker path encoding buf) - | _ -> acc - )None cases - in - begin match opt with - | None -> raise (Encoding.Unexpected_tag ctag) - | Some func -> func (len - (Binary_size.tag_size sz)) - end - - | Dynamic_size e -> - let buf, sz = int32 buf in - let sz = Int32.to_int sz in - if sz < 0 then raise (Encoding.Invalid_size sz) ; - data_checker path e buf sz - - | Delayed f -> data_checker path (f ()) buf len - - with Need_more_data buf -> - P_await { path ; encoding = e ; data_len = len }, buf - -and next_path : path -> mbytes_stream -> path * mbytes_stream = - fun path buf -> - match path with - | P_top -> - P_top, buf (* success case *) - - | P_seq { path ; encoding ; fun_data_len } -> - (* check the right branch of a sequence. fun_data_len ofs gives - the length of the data to read *) - data_checker path encoding buf (fun_data_len buf.ofs) - - | P_await { path ; encoding ; data_len } -> - (* resume from an await *) - data_checker path encoding buf data_len - - | P_list - ({ path ; encoding ; base_ofs ; data_len ; nb_elts_read } as r) -> - (* read/check an eventual element of a list *) - if data_len = buf.ofs - base_ofs then - (* we've read all the elements of the list *) - next_path path buf - else - begin - (*some more elements to read*) - assert (data_len > buf.ofs - base_ofs) ; - (*check: if we've already read some elements, then currrent ofs - should be greater then initial ofs *) - assert (nb_elts_read <= 0 || buf.ofs - base_ofs > 0) ; - let path = - P_list { r with nb_elts_read = nb_elts_read + 1} in - data_checker path encoding buf data_len - end - -let data_checker = next_path - -(* insert a given MBytes.t in a given mbytes_stream *) -let insert_mbytes mb_buf mb = - let len = MBytes.length mb in - if len > 0 then begin - Queue.push (mb, len) mb_buf.future ; - mb_buf.unread <- mb_buf.unread + len ; - end - -(* aux function called when data_checker succeeds: splits a given - mbytes_stream into a 'read' and 'unread' queues. This may - modify the content of the given mbytes_stream *) -let split_mbytes_stream { past_len ; past ; future ; unread ; ofs } = - let rel_ofs = ofs - past_len in - assert (rel_ofs >= 0) ; - if rel_ofs = 0 then past, future (* already done *) - else begin - assert (not(Queue.is_empty future)) ; (*because data_checker succeeded*) - let b, len = Queue.pop future in - assert (rel_ofs < len) ; (*inv. maintained by read_from_many_blocks*) - let b1 = MBytes.sub b 0 rel_ofs in (* read part of b *) - let b2 = MBytes.sub b rel_ofs (len-rel_ofs) in (* unread part of b *) - Queue.push b1 past ; - - (* push b2 at the beginning of 'future' using Queue.transfer*) - let tmp = Queue.create() in - Queue.push (b2, unread) tmp ; - Queue.transfer future tmp ; (*tmp === b2 ::: future in constant time*) - past, tmp - end - -(* given a state, this function returns a new status: - - if data are successfully checked, accumulated mbytes are - passed to 'success_result' that computes the final - result. Unread mbytes are also returned - - if some more data are needed, a function that waits for some - additional mbytes is returned - - eventual errors are reported/returned *) -let rec bytes_stream_reader_rec (path, mb_buf) success_result = - let success = - match path with - | P_top -> true - | P_await _ -> false - | _ -> assert false - in - assert (mb_buf.ofs >= mb_buf.past_len) ; - if success then - let q_read, q_unread = split_mbytes_stream mb_buf in - match success_result q_read mb_buf.ofs with - | Some a -> - let remaining = - List.rev @@ - Queue.fold - (fun acc (b, len) -> - if len = 0 then acc else b:: acc) [] q_unread - in - Success { res = a ; res_len = mb_buf.ofs ; remaining } - | None -> Error - (* success_result may fail because data_checker is - approximative in some situations *) - else - Await - (fun mb -> - insert_mbytes mb_buf mb ; - try - let state = data_checker path mb_buf in - bytes_stream_reader_rec state success_result - with _ -> Error) - -(* This function checks reading a stream of 'MBytes.t' wrt. a given - encoding: - - the given data encoding should have a 'Fixed' or a 'Dynamic' - size, otherwise an error is returned, - - the function returns an 'Error', a function w - ('Await w') that waits for more data (Mbytes.t), or - 'Success'. The function is parameterized by 'success_result' - that computes the data to return in case of success. - An exception 'Invalid_argument "streaming data with variable - size"' is raised if the encoding has a variable size *) -let bytes_stream_reader : - MBytes.t list -> 'a t -> - (MBytes.t Queue.t -> int -> 'b option) -> 'b status - = fun l e success_result -> - match classify e with - | `Variable -> invalid_arg "streaming data with variable size" - | `Fixed _ | `Dynamic -> - let mb_buf = { - past = Queue.create() ; past_len = 0 ; - future = Queue.create() ; unread = 0; ofs = 0 } - in - List.iter (insert_mbytes mb_buf) l ; - let path = - P_await { path = P_top ; encoding = e ; data_len = - 1 } in - try bytes_stream_reader_rec (data_checker path mb_buf) success_result - with _ -> Error - -(* concats a queue of mbytes into one MByte *) -let concat_mbyte_chunks queue tot_len = - if Queue.length queue = 1 then Queue.pop queue (* no copy *) - else (* copy smaller mbytes into one big mbyte *) - let buf = MBytes.create tot_len in - let cpt = ref 0 in - let tot_len' = ref tot_len in - while not (Queue.is_empty queue) do - let mb = Queue.pop queue in - let len = MBytes.length mb in - tot_len' := !tot_len' - len ; - assert (!tot_len' >= 0) ; - MBytes.blit mb 0 buf !cpt len ; - cpt := !cpt + len ; - done ; - assert (!tot_len' = 0) ; - buf - -(* Decode a stream of MBytes. see - Stream_reader.bytes_stream_traversal for more details *) -let read_stream_of_bytes ?(init=[]) encoding = - bytes_stream_reader init encoding - (fun read_q ofs -> Binary.of_bytes encoding (concat_mbyte_chunks read_q ofs)) - -(* Check reading a stream of MBytes. see - Stream_reader.bytes_stream_traversal for more details *) -let check_stream_of_bytes ?(init=[]) encoding = - bytes_stream_reader init encoding (fun _ _ -> Some ()) diff --git a/src/lib_data_encoding/binary_stream_reader.ml b/src/lib_data_encoding/binary_stream_reader.ml new file mode 100644 index 000000000..ef2fced19 --- /dev/null +++ b/src/lib_data_encoding/binary_stream_reader.ml @@ -0,0 +1,367 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Binary_error + +let raise e = raise (Read_error e) + +(** Persistent state of the binary reader. *) +type state = { + + stream : Binary_stream.t ; + (** All the remaining data to be read. *) + + remaining_bytes : int option ; + (** Total number of bytes that should be from 'stream' (None = + illimited). Reading less bytes should raise [Extra_bytes] and + trying to read more bytes should raise [Not_enough_data]. *) + + total_read : int ; + (** Total number of bytes that has been read from [stream] since the + beginning. *) + +} + +(** Return type for the function [read_rec]. See [Data_encoding] for its + description. *) +type 'ret status = + | Success of { result : 'ret ; size : int ; stream : Binary_stream.t } + | Await of (MBytes.t -> 'ret status) + | Error of read_error + +let check_remaining_bytes state size = + match state.remaining_bytes with + | Some len when len < size -> raise Not_enough_data + | Some len -> Some (len - size) + | None -> None + +(** [read_atom resume size conv state k] reads [size] bytes from [state], + pass it to [conv] to be decoded, and finally call the continuation [k] + with the decoded value and the updated state. + + The function [conv] is also allowed to raise [Read_error err]. + In that case the exception is catched and [Error err] is returned. + + If there is not enough [remaining_bytes] to be read in [state], the + function returns [Error Not_enough_data] instead of calling + the continuation. + + If there is not enough [allowed_bytes] to be read in [state], the + function returns [Error Size_limit_exceeded] instead of calling + the continuation. + + If there is not enough bytes to be read in [state], the function + returns [Await resume] instead of calling the continuation. *) +let read_atom resume size conv state k = + match + let remaining_bytes = check_remaining_bytes state size in + let res, stream = Binary_stream.read state.stream size in + conv res.buffer res.ofs, + { remaining_bytes ; stream ; + total_read = state.total_read + size } + with + | exception (Read_error error) -> Error error + | exception Binary_stream.Need_more_data -> Await resume + | v -> k v (* tail call *) + +(** Reader for all the atomic types. *) +module Atom = struct + + let uint8 r = read_atom r Binary_size.uint8 MBytes.get_uint8 + let uint16 r = read_atom r Binary_size.int16 MBytes.get_uint16 + + let int8 r = read_atom r Binary_size.int8 MBytes.get_int8 + let int16 r = read_atom r Binary_size.int16 MBytes.get_int16 + let int32 r = read_atom r Binary_size.int32 MBytes.get_int32 + let int64 r = read_atom r Binary_size.int64 MBytes.get_int64 + + let float r = read_atom r Binary_size.float MBytes.get_double + + let bool resume state k = + int8 resume state @@ fun (v, state) -> + k (v <> 0, state) + + let uint30 r = + read_atom r Binary_size.uint30 @@ fun buffer ofs -> + let v = Int32.to_int (MBytes.get_int32 buffer ofs) in + if v < 0 then + raise (Invalid_int { min = 0 ; v ; max = (1 lsl 30) - 1 }) ; + v + + let int31 r = + read_atom r Binary_size.int31 @@ fun buffer ofs -> + Int32.to_int (MBytes.get_int32 buffer ofs) + + let ranged_int ~minimum ~maximum resume state k = + let read_int = + match Binary_size.range_to_size ~minimum ~maximum with + | `Int8 -> int8 + | `Int16 -> int16 + | `Int31 -> int31 + | `Uint8 -> uint8 + | `Uint16 -> uint16 + | `Uint30 -> uint30 in + read_int resume state @@ fun (ranged, state) -> + let ranged = if minimum > 0 then ranged + minimum else ranged in + if not (minimum <= ranged && ranged <= maximum) then + Error (Invalid_int { min = minimum ; v =ranged ; max = maximum }) + else + k (ranged, state) + + let ranged_float ~minimum ~maximum resume state k = + float resume state @@ fun (ranged, state) -> + if not (minimum <= ranged && ranged <= maximum) then + Error (Invalid_float { min = minimum ; v = ranged ; max = maximum }) + else + k (ranged, state) + + let z resume state k = + let res = Buffer.create 100 in + uint8 resume state @@ fun (first, state) -> + if first = 0 then + k (Z.zero, state) + else + let first_value = first land 0x3F in + let sign = (first land 0x40) <> 0 in + let rec read prev value bit state = + if prev land 0x80 = 0x00 then begin + if bit > 0 then Buffer.add_char res (Char.unsafe_chr value) ; + if prev = 0x00 then raise Trailing_zero ; + let bits = Buffer.contents res in + let res = Z.of_bits bits in + let res = if sign then Z.neg res else res in + k (res, state) + end else + let resume buffer = + let stream = Binary_stream.push buffer state.stream in + uint8 resume { state with stream } (read_next value bit) in + uint8 resume state (read_next value bit) + and read_next value bit (byte, state) = + let value = value lor ((byte land 0x7F) lsl bit) in + let bit = bit + 7 in + let bit, value = + if bit >= 8 then begin + Buffer.add_char res (Char.unsafe_chr (value land 0xFF)) ; + bit - 8, value lsr 8 + end else + bit, value in + read byte value bit state in + read first first_value 6 state + + let string_enum arr resume state k = + let read_index = + match Binary_size.enum_size arr with + | `Uint8 -> uint8 + | `Uint16 -> uint16 + | `Uint30 -> uint30 in + read_index resume state @@ fun (index, state) -> + if index >= Array.length arr then + Error No_case_matched + else + k (arr.(index), state) + + let fixed_length_bytes length r = + read_atom r length @@ fun buf ofs -> + MBytes.sub buf ofs length + + let fixed_length_string length r = + read_atom r length @@ fun buf ofs -> + MBytes.sub_string buf ofs length + + let tag = function + | `Uint8 -> uint8 + | `Uint16 -> uint16 + +end + +(** Main recursive reading function, in continuation passing style. *) +let rec read_rec + : type next ret. + next Encoding.t -> state -> ((next * state) -> ret status) -> ret status + = fun e state k -> + let resume buffer = + let stream = Binary_stream.push buffer state.stream in + try read_rec e { state with stream }k + with Read_error err -> Error err in + let open Encoding in + assert (Encoding.classify e <> `Variable || state.remaining_bytes <> None) ; + match e.encoding with + | Null -> k ((), state) + | Empty -> k ((), state) + | Constant _ -> k ((), state) + | Ignore -> k ((), state) + | Bool -> Atom.bool resume state k + | Int8 -> Atom.int8 resume state k + | Uint8 -> Atom.uint8 resume state k + | Int16 -> Atom.int16 resume state k + | Uint16 -> Atom.uint16 resume state k + | Int31 -> Atom.int31 resume state k + | Int32 -> Atom.int32 resume state k + | Int64 -> Atom.int64 resume state k + | Z -> Atom.z resume state k + | Float -> Atom.float resume state k + | Bytes (`Fixed n) -> Atom.fixed_length_bytes n resume state k + | Bytes `Variable -> + let size = remaining_bytes state in + Atom.fixed_length_bytes size resume state k + | String (`Fixed n) -> Atom.fixed_length_string n resume state k + | String `Variable -> + let size = remaining_bytes state in + Atom.fixed_length_string size resume state k + | RangedInt { minimum ; maximum } -> + Atom.ranged_int ~minimum ~maximum resume state k + | RangedFloat { minimum ; maximum } -> + Atom.ranged_float ~minimum ~maximum resume state k + | String_enum (_, arr) -> + Atom.string_enum arr resume state k + | Array e -> + read_list e state @@ fun (l, state) -> + k (Array.of_list l, state) + | List e -> read_list e state k + | (Obj (Req (_, e))) -> read_rec e state k + | (Obj (Dft (_, e, _))) -> read_rec e state k + | (Obj (Opt (`Dynamic, _, e))) -> + Atom.bool resume state @@ fun (present, state) -> + if not present then + k (None, state) + else + read_rec e state @@ fun (v, state) -> + k (Some v, state) + | (Obj (Opt (`Variable, _, e))) -> + let size = remaining_bytes state in + if size = 0 then + k (None, state) + else + read_rec e state @@ fun (v, state) -> + k (Some v, state) + | Objs (`Fixed sz, e1, e2) -> + ignore (check_remaining_bytes state sz : int option) ; + read_rec e1 state @@ fun (left, state) -> + read_rec e2 state @@ fun (right, state) -> + k ((left, right), state) + | Objs (`Dynamic, e1, e2) -> + read_rec e1 state @@ fun (left, state) -> + read_rec e2 state @@ fun (right, state) -> + k ((left, right), state) + | (Objs (`Variable, e1, e2)) -> + read_variable_pair e1 e2 state k + | Tup e -> read_rec e state k + | Tups (`Fixed sz, e1, e2) -> + ignore (check_remaining_bytes state sz : int option) ; + read_rec e1 state @@ fun (left, state) -> + read_rec e2 state @@ fun (right, state) -> + k ((left, right), state) + | Tups (`Dynamic, e1, e2) -> + read_rec e1 state @@ fun (left, state) -> + read_rec e2 state @@ fun (right, state) -> + k ((left, right), state) + | (Tups (`Variable, e1, e2)) -> + read_variable_pair e1 e2 state k + | Conv { inj ; encoding } -> + read_rec encoding state @@ fun (v, state) -> + k (inj v, state) + | Union (_, sz, cases) -> begin + Atom.tag sz resume state @@ fun (ctag, state) -> + match + List.find + (function + | Case { tag = Tag tag } -> tag = ctag + | Case { tag = Json_only } -> false) + cases + with + | exception Not_found -> Error (Unexpected_tag ctag) + | Case { encoding ; inj } -> + read_rec encoding state @@ fun (v, state) -> + k (inj v, state) + end + | Dynamic_size e -> + Atom.int32 resume state @@ fun (sz, state) -> + let sz = Int32.to_int sz in + if sz < 0 then + Error (Invalid_size sz) + else + let remaining = check_remaining_bytes state sz in + let state = { state with remaining_bytes = Some sz } in + read_rec e state @@ fun (v, state) -> + if state.remaining_bytes <> Some 0 then + Error Extra_bytes + else + k (v, { state with remaining_bytes = remaining }) + | Describe { encoding = e } -> read_rec e state k + | Def { encoding = e } -> read_rec e state k + | Splitted { encoding = e } -> read_rec e state k + | Mu (_, _, self) -> read_rec (self e) state k + | Delayed f -> read_rec (f ()) state k + +and remaining_bytes { remaining_bytes } = + match remaining_bytes with + | None -> + (* This function should only be called with a variable encoding, + for which the `remaining_bytes` should never be `None`. *) + assert false + | Some len -> len + +and read_variable_pair + : type left right ret. + left Encoding.t -> right Encoding.t -> state -> + (((left * right) * state) -> ret status) -> ret status + = fun e1 e2 state k -> + let size = remaining_bytes state in + match Encoding.classify e1, Encoding.classify e2 with + | (`Dynamic | `Fixed _), `Variable -> + read_rec e1 state @@ fun (left, state) -> + read_rec e2 state @@ fun (right, state) -> + k ((left, right), state) + | `Variable, `Fixed n -> + if n > size then + Error Not_enough_data + else + let state = { state with remaining_bytes = Some (size - n) } in + read_rec e1 state @@ fun (left, state) -> + assert (state.remaining_bytes = Some 0) ; + let state = { state with remaining_bytes = Some n } in + read_rec e2 state @@ fun (right, state) -> + assert (state.remaining_bytes = Some 0) ; + k ((left, right), state) + | _ -> assert false (* Should be rejected by [Encoding.Kind.combine] *) + +and read_list + : type a ret. + a Encoding.t -> state -> ((a list * state) -> ret status) -> ret status + = fun e state k -> + let rec loop state acc = + let size = remaining_bytes state in + if size = 0 then + k (List.rev acc, state) + else + read_rec e state @@ fun (v, state) -> + loop state (v :: acc) in + loop state [] + +let read_rec e state k = + try read_rec e state k + with Read_error err -> Error err + + + +(** ******************** *) +(** Various entry points *) + +let success (v, state) = + Success { result = v ; size = state.total_read ; stream = state.stream } + +let read_stream ?(init = Binary_stream.empty) encoding = + match Encoding.classify encoding with + | `Variable -> + invalid_arg "Data_encoding.Binary.read_stream: variable encoding" + | `Dynamic | `Fixed _ -> + (* No hardcoded read limit in a stream. *) + let state = { remaining_bytes = None ; + stream = init ; total_read = 0 } in + read_rec encoding state success diff --git a/src/lib_data_encoding/binary_stream_reader.mli b/src/lib_data_encoding/binary_stream_reader.mli new file mode 100644 index 000000000..74c86850b --- /dev/null +++ b/src/lib_data_encoding/binary_stream_reader.mli @@ -0,0 +1,18 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** This is for use *within* the data encoding library only. Instead, you should + use the corresponding module intended for use: {Data_encoding.Binary}. *) + +type 'ret status = + | Success of { result : 'ret ; size : int ; stream : Binary_stream.t } + | Await of (MBytes.t -> 'ret status) + | Error of Binary_error.read_error + +val read_stream: ?init:Binary_stream.t -> 'a Encoding.t -> 'a status diff --git a/src/lib_data_encoding/data_encoding.ml b/src/lib_data_encoding/data_encoding.ml index f16988c46..6043355b4 100644 --- a/src/lib_data_encoding/data_encoding.ml +++ b/src/lib_data_encoding/data_encoding.ml @@ -23,7 +23,9 @@ module Json = Json module Bson = Bson module Binary = struct include Binary - include Binary_stream + include Binary_error + include Binary_reader + include Binary_stream_reader end type json = Json.t diff --git a/src/lib_data_encoding/data_encoding.mli b/src/lib_data_encoding/data_encoding.mli index 843324e75..8f1b96799 100644 --- a/src/lib_data_encoding/data_encoding.mli +++ b/src/lib_data_encoding/data_encoding.mli @@ -534,29 +534,25 @@ module Binary: sig val of_bytes : 'a Encoding.t -> MBytes.t -> 'a option val of_bytes_exn : 'a Encoding.t -> MBytes.t -> 'a - (** This type is used when decoding binary data incrementally. - - In case of 'Success', the decoded data, the size of used data - to decode the result, and the remaining data are returned - - In case of error, 'Error' is returned - - 'Await' status embeds a function that waits for additional data - to continue decoding, when given data are not sufficient *) - type 'a status = - | Success of { res : 'a ; res_len : int ; remaining : MBytes.t list } - | Await of (MBytes.t -> 'a status) - | Error + type read_error = + | Not_enough_data + | Extra_bytes + | No_case_matched + | Unexpected_tag of int + | Invalid_size of int + | Invalid_int of { min : int ; v : int ; max : int } + | Invalid_float of { min : float ; v : float ; max : float } + | Trailing_zero + exception Read_error of read_error + val pp_read_error: Format.formatter -> read_error -> unit - (** This function allows to decode (or to initialize decoding) a - stream of 'MByte.t'. The given data encoding should have a - 'Fixed' or a 'Dynamic' size, otherwise an exception - 'Invalid_argument "streaming data with variable size"' is - raised *) - val read_stream_of_bytes : ?init:MBytes.t list -> 'a Encoding.t -> 'a status + type 'ret status = + | Success of { result : 'ret ; size : int ; stream : Binary_stream.t } + | Await of (MBytes.t -> 'ret status) + | Error of read_error + + val read_stream: ?init:Binary_stream.t -> 'a Encoding.t -> 'a status - (** Like read_stream_of_bytes, but only checks that the stream can - be read. Note that this is an approximation because failures - that may come from conversion functions present in encodings are - not checked *) - val check_stream_of_bytes : ?init:MBytes.t list -> 'a Encoding.t -> unit status val fixed_length : 'a Encoding.t -> int option val fixed_length_exn : 'a Encoding.t -> int diff --git a/src/lib_data_encoding/test/helpers.ml b/src/lib_data_encoding/test/helpers.ml index 0a329382b..e379d7d48 100644 --- a/src/lib_data_encoding/test/helpers.ml +++ b/src/lib_data_encoding/test/helpers.ml @@ -22,10 +22,10 @@ let no_exception f = Alcotest.failf "@[v 2>json failed:@ %a@]" (fun ppf -> Json_encoding.print_error ppf) exn - | exn -> + | Binary.Read_error error -> Alcotest.failf - "@[v 2>unexpected exception:@ %s@]" - (Printexc.to_string exn) + "@[v 2>bytes reading failed:@ %a@]" + Binary.pp_read_error error let check_raises expected f = match f () with @@ -40,14 +40,13 @@ let chunked_read sz encoding bytes = (fun status chunk -> match status with | Binary.Await f -> f chunk - | Success _ when MBytes.length chunk <> 0 -> Error - | Success _ | Error -> status) - (Binary.read_stream_of_bytes encoding) + | Success _ when MBytes.length chunk <> 0 -> Error Extra_bytes + | Success _ | Error _ -> status) + (Binary.read_stream encoding) (MBytes.cut sz bytes) in match status with - | Success { remaining ; _ } when - List.exists (fun b -> MBytes.length b <> 0) remaining -> - Binary.Error + | Success { stream ; _ } when not (Binary_stream.is_empty stream) -> + Binary.Error Extra_bytes | _ -> status let streamed_read encoding bytes = @@ -55,6 +54,6 @@ let streamed_read encoding bytes = (fun (status, count as acc) chunk -> match status with | Binary.Await f -> (f chunk, succ count) - | Success _ | Error -> acc) - (Binary.read_stream_of_bytes encoding, 0) + | Success _ | Error _ -> acc) + (Binary.read_stream encoding, 0) (MBytes.cut 1 bytes) diff --git a/src/lib_data_encoding/test/read_failure.ml b/src/lib_data_encoding/test/read_failure.ml index 8f50a00cc..f4fc2b6f2 100644 --- a/src/lib_data_encoding/test/read_failure.ml +++ b/src/lib_data_encoding/test/read_failure.ml @@ -14,19 +14,19 @@ open Helpers open Types let not_enough_data = function - | Invalid_argument _ -> true + | Binary.Read_error Not_enough_data -> true | _ -> false let extra_bytes = function - | Failure _ -> true + | Binary.Read_error Extra_bytes -> true | _ -> false let trailing_zero = function - | Failure _ -> true + | Binary.Read_error Trailing_zero -> true | _ -> false let invalid_int = function - | Data_encoding.Int_out_of_range _ -> true + | Binary.Read_error (Invalid_int _) -> true | Json_encoding.Cannot_destruct ([] , Failure _) -> true | _ -> false @@ -35,17 +35,17 @@ let invalid_string_length = function ([], Json_encoding.Unexpected ("string (len 9)", "string (len 4)")) -> true | Json_encoding.Cannot_destruct ([], Json_encoding.Unexpected ("bytes (len 9)", "bytes (len 4)")) -> true - | Failure _ -> true + | Binary.Read_error Extra_bytes -> true | _ -> false let missing_case = function | Json_encoding.Cannot_destruct ([], Json_encoding.No_case_matched _ ) -> true - | Unexpected_tag _ -> true + | Binary.Read_error (Unexpected_tag _) -> true | _ -> false let missing_enum = function | Json_encoding.Cannot_destruct ([], Json_encoding.Unexpected _ ) -> true - | No_case_matched -> true + | Binary.Read_error No_case_matched -> true | _ -> false let json ?(expected = fun _ -> true) read_encoding json () = @@ -63,7 +63,7 @@ let binary ?(expected = fun _ -> true) read_encoding bytes () = ignore (Binary.of_bytes_exn read_encoding bytes) ; end -let stream read_encoding bytes () = +let stream ?(expected = fun _ -> true) read_encoding bytes () = let len_data = MBytes.length bytes in for sz = 1 to max 1 len_data do let name = Format.asprintf "stream (%d)" sz in @@ -72,7 +72,13 @@ let stream read_encoding bytes () = Alcotest.failf "%s failed: expecting exception, got success." name | Binary.Await _ -> Alcotest.failf "%s failed: not enough data" name - | Error -> () + | Binary.Error error when expected (Binary.Read_error error) -> + () + | Binary.Error error -> + Alcotest.failf + "@[%s failed: read error@ %a@]" + name + Binary.pp_read_error error done let all ?expected name write_encoding read_encoding value = @@ -82,7 +88,7 @@ let all ?expected name write_encoding read_encoding value = [ name ^ ".json", `Quick, json ?expected read_encoding json_value ; name ^ ".bson", `Quick, bson ?expected read_encoding bson_value ; name ^ ".bytes", `Quick, binary ?expected read_encoding bytes_value ; - name ^ ".stream", `Quick, stream read_encoding bytes_value ; + name ^ ".stream", `Quick, stream ?expected read_encoding bytes_value ; ] let all_ranged_int minimum maximum = diff --git a/src/lib_data_encoding/test/success.ml b/src/lib_data_encoding/test/success.ml index 80b51aa5b..f03960f04 100644 --- a/src/lib_data_encoding/test/success.ml +++ b/src/lib_data_encoding/test/success.ml @@ -48,15 +48,18 @@ let stream ty encoding value () = for sz = 1 to max 1 len_data do let name = Format.asprintf "stream (%d)" sz in match chunked_read sz encoding bytes with - | Binary.Success { res = result ; res_len = size ; remaining } -> + | Binary.Success { result ; size ; stream } -> if size <> MBytes.length bytes || - List.exists (fun b -> MBytes.length b <> 0) remaining then + not (Binary_stream.is_empty stream) then Alcotest.failf "%s failed: remaining data" name ; Alcotest.check ty name value result | Binary.Await _ -> Alcotest.failf "%s failed: not enough data" name - | Binary.Error -> - Alcotest.failf "@[%s failed: read error@]" name + | Binary.Error error -> + Alcotest.failf + "@[%s failed: read error@ %a@]" + name + Binary.pp_read_error error done ; end @@ -97,10 +100,12 @@ let all_ranged_float minimum maximum = all (name ^ ".max") Alcotest.float encoding maximum let test_z_sequence () = - let test i = binary Alcotest.z z i () in - for i = -1_00_000 to 1_00_000 do test (Z.of_int i) done ; - for i = 100_000_000 to 100_100_000 do test (Z.of_int i) done ; - for i = -100_000_000 downto -100_100_000 do test (Z.of_int i) done + let test i = + binary Alcotest.z z i () ; + stream Alcotest.z z i () in + for i = -10_000 to 10_000 do test (Z.of_int i) done ; + for i = 100_000_000 to 100_010_000 do test (Z.of_int i) done ; + for i = -100_000_000 downto -100_010_000 do test (Z.of_int i) done let test_string_enum_boundary () = let entries = List.rev_map (fun x -> string_of_int x, x) (0 -- 254) in diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index 8f0b9e0be..845368648 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -233,14 +233,14 @@ module Reader = struct mutable worker: unit Lwt.t ; } - let read_message st init_mbytes = + let read_message st init = let rec loop status = Lwt_unix.yield () >>= fun () -> let open Data_encoding.Binary in match status with - | Success { res ; res_len ; remaining } -> - return (Some (res, res_len, remaining)) - | Error -> + | Success { result ; size ; stream } -> + return (Some (result, size, stream)) + | Error _ -> lwt_debug "[read_message] incremental decoding error" >>= fun () -> return None | Await decode_next_buf -> @@ -251,27 +251,26 @@ module Reader = struct "reading %d bytes from %a" (MBytes.length buf) P2p_connection.Info.pp st.conn.info >>= fun () -> loop (decode_next_buf buf) in - loop - (Data_encoding.Binary.read_stream_of_bytes ~init:init_mbytes st.encoding) + loop (Data_encoding.Binary.read_stream ?init st.encoding) - let rec worker_loop st init_mbytes = + let rec worker_loop st stream = begin - read_message st init_mbytes >>=? fun msg -> + read_message st stream >>=? fun msg -> match msg with | None -> protect ~canceler:st.canceler begin fun () -> Lwt_pipe.push st.messages (Error [P2p_errors.Decoding_error]) >>= fun () -> return None end - | Some (msg, size, rem_mbytes) -> + | Some (msg, size, stream) -> protect ~canceler:st.canceler begin fun () -> Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> - return (Some rem_mbytes) + return (Some stream) end end >>= function - | Ok Some rem_mbytes -> - worker_loop st rem_mbytes + | Ok (Some stream) -> + worker_loop st (Some stream) | Ok None -> Lwt_canceler.cancel st.canceler >>= fun () -> Lwt.return_unit @@ -301,7 +300,7 @@ module Reader = struct end ; st.worker <- Lwt_utils.worker "reader" - ~run:(fun () -> worker_loop st []) + ~run:(fun () -> worker_loop st None) ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ; st diff --git a/src/lib_stdlib/binary_stream.ml b/src/lib_stdlib/binary_stream.ml new file mode 100644 index 000000000..9eb2a5757 --- /dev/null +++ b/src/lib_stdlib/binary_stream.ml @@ -0,0 +1,81 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* Facilities to decode streams of binary data *) + +type buffer = { + buffer : MBytes.t ; + ofs : int ; + len : int ; +} + +type t = { + current : buffer ; + (* buffer queue (classical double list implementation) *) + pending : MBytes.t list ; + pending_rev : MBytes.t list ; + (* number unread bytes in 'current + pending + pending_rev' *) + unread : int ; +} + +let is_empty { unread ; _ } = unread = 0 + +let of_buffer current = + { current ; + pending = [] ; + pending_rev = [] ; + unread = current.len } + +let of_bytes buffer = + let len = MBytes.length buffer in + of_buffer { buffer ; ofs = 0 ; len } + +let empty = of_bytes (MBytes.create 0) + +let push buffer stream = + { stream with pending_rev = buffer :: stream.pending_rev ; + unread = stream.unread + MBytes.length buffer } + +exception Need_more_data + +let split buffer len = + assert (len <= buffer.len) ; + { buffer with len }, + { buffer with ofs = buffer.ofs + len ; len = buffer.len - len } + +let read stream len = + if len > stream.unread then raise Need_more_data ; + if len <= stream.current.len then + let res, current = split stream.current len in + res, { stream with current ; unread = stream.unread - len } + else + let res = { buffer = MBytes.create len ; ofs = 0 ; len } in + MBytes.blit + stream.current.buffer stream.current.ofs + res.buffer 0 + stream.current.len ; + let rec loop ofs pending_rev = function + | [] -> loop ofs [] (List.rev pending_rev) + | buffer :: pending -> + let current = { buffer ; ofs = 0 ; len = MBytes.length buffer } in + let to_read = len - ofs in + if to_read <= current.len then begin + MBytes.blit current.buffer 0 res.buffer ofs to_read ; + res, + { current = { current with ofs = to_read ; + len = current.len - to_read } ; + pending ; + pending_rev ; + unread = stream.unread - len ; + } + end else begin + MBytes.blit current.buffer 0 res.buffer ofs current.len ; + loop (ofs + current.len) pending_rev pending + end in + loop stream.current.len stream.pending_rev stream.pending diff --git a/src/lib_stdlib/binary_stream.mli b/src/lib_stdlib/binary_stream.mli new file mode 100644 index 000000000..5a32aec54 --- /dev/null +++ b/src/lib_stdlib/binary_stream.mli @@ -0,0 +1,24 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t + +type buffer = { + buffer : MBytes.t ; + ofs : int ; + len : int ; +} + +exception Need_more_data + +val is_empty: t -> bool +val empty: t +val of_buffer: buffer -> t +val read: t -> int -> buffer * t +val push: MBytes.t -> t -> t