Data_encoding: use CPS for the stream reader
This commit is contained in:
parent
b83797371e
commit
aee6718148
@ -20,10 +20,6 @@ type 'l writer = {
|
||||
write: 'a. 'a Encoding.t -> 'a -> MBytes.t -> int -> int ;
|
||||
}
|
||||
|
||||
type 'l reader = {
|
||||
read: 'a. 'a Encoding.t -> MBytes.t -> int -> int -> (int * 'a) ;
|
||||
}
|
||||
|
||||
let rec length : type x. x Encoding.t -> x -> int = fun e ->
|
||||
let open Encoding in
|
||||
match e.encoding with
|
||||
@ -546,264 +542,6 @@ let to_bytes t v =
|
||||
write_rec_buffer t v bytes ;
|
||||
MBytes_buffer.to_mbytes bytes
|
||||
|
||||
(** Reader *)
|
||||
|
||||
module Reader = struct
|
||||
|
||||
let int8 buf ofs _len =
|
||||
ofs + Binary_size.int8, MBytes.get_int8 buf ofs
|
||||
|
||||
let uint8 buf ofs _len =
|
||||
ofs + Binary_size.uint8, MBytes.get_uint8 buf ofs
|
||||
|
||||
let char buf ofs _len =
|
||||
ofs + Binary_size.char, MBytes.get_char buf ofs
|
||||
|
||||
let bool buf ofs len =
|
||||
let ofs, v = int8 buf ofs len in
|
||||
ofs, v <> 0
|
||||
|
||||
let int16 buf ofs _len =
|
||||
ofs + Binary_size.int16, MBytes.get_int16 buf ofs
|
||||
|
||||
let uint16 buf ofs _len =
|
||||
ofs + Binary_size.uint16, MBytes.get_uint16 buf ofs
|
||||
|
||||
let uint30 buf ofs _len =
|
||||
let v = Int32.to_int (MBytes.get_int32 buf ofs) in
|
||||
if v < 0 then
|
||||
failwith "Data_encoding.Binary.Reader.uint30: invalid data." ;
|
||||
ofs + Binary_size.uint30, v
|
||||
|
||||
let int31 buf ofs _len =
|
||||
ofs + Binary_size.int31, Int32.to_int (MBytes.get_int32 buf ofs)
|
||||
|
||||
let int32 buf ofs _len =
|
||||
ofs + Binary_size.int32, MBytes.get_int32 buf ofs
|
||||
|
||||
let int64 buf ofs _len =
|
||||
ofs + Binary_size.int64, MBytes.get_int64 buf ofs
|
||||
|
||||
let z buf ofs _len =
|
||||
let res = Buffer.create 100 in
|
||||
let rec read prev i value bit =
|
||||
if prev land 0x80 = 0x00 then begin
|
||||
if bit > 0 then Buffer.add_char res (Char.unsafe_chr value) ;
|
||||
if prev = 0x00 then failwith "trailing zeroes in Z encoding" ;
|
||||
i
|
||||
end else
|
||||
let byte = MBytes.get_uint8 buf (ofs + i) in
|
||||
let value = value lor ((byte land 0x7F) lsl bit) in
|
||||
let bit = bit + 7 in
|
||||
let bit, value = if bit >= 8 then begin
|
||||
Buffer.add_char res (Char.unsafe_chr (value land 0xFF)) ;
|
||||
bit - 8, value lsr 8
|
||||
end else bit, value in
|
||||
read byte (i + 1) value bit in
|
||||
let first = MBytes.get_uint8 buf ofs in
|
||||
if first = 0 then
|
||||
ofs + 1, Z.zero
|
||||
else
|
||||
let value = first land 0x3F in
|
||||
let sign = (first land 0x40) <> 0 in
|
||||
let length = read first 1 value 6 in
|
||||
let bits = Buffer.contents res in
|
||||
let res = Z.of_bits bits in
|
||||
let res = if sign then Z.neg res else res in
|
||||
ofs + length, res
|
||||
|
||||
(** read a float64 (double) **)
|
||||
let float buf ofs _len =
|
||||
(*Here, float means float64, which is read using MBytes.get_double !!*)
|
||||
ofs + Binary_size.float, MBytes.get_double buf ofs
|
||||
|
||||
let int_of_int32 i =
|
||||
let i' = Int32.to_int i in
|
||||
let i'' = Int32.of_int i' in
|
||||
if i'' = i then
|
||||
i'
|
||||
else
|
||||
invalid_arg "int_of_int32 overflow"
|
||||
|
||||
let fixed_length_bytes length buf ofs _len =
|
||||
let s = MBytes.sub buf ofs length in
|
||||
ofs + length, s
|
||||
|
||||
let fixed_length_string length buf ofs _len =
|
||||
let s = MBytes.sub_string buf ofs length in
|
||||
ofs + length, s
|
||||
|
||||
let seq r1 r2 buf ofs len =
|
||||
let ofs', v1 = r1 buf ofs len in
|
||||
let ofs'', v2 = r2 buf ofs' (len - (ofs' - ofs)) in
|
||||
ofs'', (v1, v2)
|
||||
|
||||
let varseq r e1 e2 buf ofs len =
|
||||
let k1 = Encoding.classify e1
|
||||
and k2 = Encoding.classify e2 in
|
||||
match k1, k2 with
|
||||
| (`Dynamic | `Fixed _), `Variable ->
|
||||
let ofs', v1 = r.read e1 buf ofs len in
|
||||
let ofs'', v2 = r.read e2 buf ofs' (len - (ofs' - ofs)) in
|
||||
ofs'', (v1, v2)
|
||||
| `Variable, `Fixed n ->
|
||||
let ofs', v1 = r.read e1 buf ofs (len - n) in
|
||||
let ofs'', v2 = r.read e2 buf ofs' n in
|
||||
ofs'', (v1, v2)
|
||||
| _ -> assert false (* Should be rejected by Kind.combine *)
|
||||
|
||||
let list read buf ofs len =
|
||||
let rec loop acc ofs len =
|
||||
assert (len >= 0);
|
||||
if len <= 0
|
||||
then ofs, List.rev acc
|
||||
else
|
||||
let ofs', v = read buf ofs len in
|
||||
assert (ofs' > ofs);
|
||||
loop (v :: acc) ofs' (len - (ofs' - ofs))
|
||||
in
|
||||
loop [] ofs len
|
||||
|
||||
let array read buf ofs len =
|
||||
let ofs, l = list read buf ofs len in
|
||||
ofs, Array.of_list l
|
||||
|
||||
let conv inj r buf ofs len =
|
||||
let ofs, v = r buf ofs len in
|
||||
ofs, inj v
|
||||
|
||||
let read_tag = function
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
|
||||
let union r sz cases =
|
||||
let open Encoding in
|
||||
let read_cases =
|
||||
TzList.filter_map
|
||||
(function
|
||||
| (Case { tag = Json_only }) -> None
|
||||
| (Case { encoding = e ; inj ; tag = Tag tag }) ->
|
||||
let read = r.read e in
|
||||
Some (tag, fun len buf ofs ->
|
||||
let ofs, v = read len buf ofs in
|
||||
ofs, inj v))
|
||||
cases in
|
||||
fun buf ofs len ->
|
||||
let ofs, tag = read_tag sz buf ofs len in
|
||||
try List.assoc tag read_cases buf ofs (len - Binary_size.tag_size sz)
|
||||
with Not_found -> raise (Unexpected_tag tag)
|
||||
|
||||
end
|
||||
|
||||
let rec read_rec : type a. a Encoding.t-> MBytes.t -> int -> int -> int * a = fun e ->
|
||||
let open Encoding in
|
||||
let open Reader in
|
||||
match e.encoding with
|
||||
| Null -> (fun _buf ofs _len -> ofs, ())
|
||||
| Empty -> (fun _buf ofs _len -> ofs, ())
|
||||
| Constant _ -> (fun _buf ofs _len -> ofs, ())
|
||||
| Ignore -> (fun _buf ofs len -> ofs + len, ())
|
||||
| Bool -> bool
|
||||
| Int8 -> int8
|
||||
| Uint8 -> uint8
|
||||
| Int16 -> int16
|
||||
| Uint16 -> uint16
|
||||
| Int31 -> int31
|
||||
| Int32 -> int32
|
||||
| Int64 -> int64
|
||||
| Z -> z
|
||||
| RangedInt { minimum ; maximum } ->
|
||||
(fun buf ofs alpha ->
|
||||
let ofs, value =
|
||||
match Binary_size.range_to_size ~minimum ~maximum with
|
||||
| `Int8 -> int8 buf ofs alpha
|
||||
| `Int16 -> int16 buf ofs alpha
|
||||
| `Int31 -> int31 buf ofs alpha
|
||||
| `Uint8 -> uint8 buf ofs alpha
|
||||
| `Uint16 -> uint16 buf ofs alpha
|
||||
| `Uint30 -> uint30 buf ofs alpha in
|
||||
let value = if minimum > 0 then value + minimum else value in
|
||||
if value < minimum || value > maximum
|
||||
then raise (Int_out_of_range (value, minimum, maximum)) ;
|
||||
(ofs, value))
|
||||
| Float -> float
|
||||
| RangedFloat { minimum ; maximum } ->
|
||||
(fun buf ofs len ->
|
||||
let offset, value = float buf ofs len in
|
||||
if value < minimum || value > maximum
|
||||
then raise (Float_out_of_range (value, minimum, maximum)) ;
|
||||
(offset, value))
|
||||
| Bytes (`Fixed n) -> fixed_length_bytes n
|
||||
| String (`Fixed n) -> fixed_length_string n
|
||||
| Bytes `Variable -> fun buf ofs len -> fixed_length_bytes len buf ofs len
|
||||
| String `Variable -> fun buf ofs len -> fixed_length_string len buf ofs len
|
||||
| String_enum (_, arr) -> begin
|
||||
fun buf ofs a ->
|
||||
let ofs, ind =
|
||||
match Binary_size.enum_size arr with
|
||||
| `Uint8 -> uint8 buf ofs a
|
||||
| `Uint16 -> uint16 buf ofs a
|
||||
| `Uint30 -> uint30 buf ofs a in
|
||||
if ind >= Array.length arr
|
||||
then raise No_case_matched
|
||||
else (ofs, arr.(ind))
|
||||
end
|
||||
| Array e -> array (read_rec e)
|
||||
| List e -> list (read_rec e)
|
||||
| Obj (Req (_, e)) -> read_rec e
|
||||
| Obj (Opt (`Dynamic, _, t)) ->
|
||||
let read = read_rec t in
|
||||
(fun buf ofs len ->
|
||||
let ofs, v = int8 buf ofs len in
|
||||
if v = 0 then ofs, None
|
||||
else let ofs, v = read buf ofs (len - Binary_size.int8) in ofs, Some v)
|
||||
| Obj (Opt (`Variable, _, t)) ->
|
||||
let read = read_rec t in
|
||||
(fun buf ofs len ->
|
||||
if len = 0 then ofs, None
|
||||
else
|
||||
let ofs', v = read buf ofs len in
|
||||
assert (ofs' = ofs + len) ;
|
||||
ofs + len, Some v)
|
||||
| Obj (Dft (_, e, _)) -> read_rec e
|
||||
| Objs ((`Fixed _ | `Dynamic), e1, e2) ->
|
||||
seq (read_rec e1) (read_rec e2)
|
||||
| Objs (`Variable, e1, e2) ->
|
||||
varseq { read = fun t -> read_rec t } e1 e2
|
||||
| Tup e -> read_rec e
|
||||
| Tups ((`Fixed _ | `Dynamic), e1, e2) ->
|
||||
seq (read_rec e1) (read_rec e2)
|
||||
| Tups (`Variable, e1, e2) ->
|
||||
varseq { read = fun t -> read_rec t } e1 e2
|
||||
| Conv { inj ; encoding = e } -> conv inj (read_rec e)
|
||||
| Describe { encoding = e } -> read_rec e
|
||||
| Def { encoding = e } -> read_rec e
|
||||
| Splitted { encoding = e } -> read_rec e
|
||||
| Union (_, sz, cases) ->
|
||||
union { read = fun t -> read_rec t } sz cases
|
||||
| Mu (_, _, self) -> fun buf ofs len -> read_rec (self e) buf ofs len
|
||||
| Dynamic_size e ->
|
||||
let read = read_rec e in
|
||||
fun buf ofs len ->
|
||||
let ofs, sz = int32 buf ofs len in
|
||||
let sz = Int32.to_int sz in
|
||||
if sz < 0 then raise (Invalid_size sz);
|
||||
read buf ofs sz
|
||||
| Delayed f -> read_rec (f ())
|
||||
|
||||
let read t buf ofs len =
|
||||
try Some (read_rec t buf ofs len)
|
||||
with _ -> None
|
||||
let write = write
|
||||
let of_bytes_exn ty buf =
|
||||
let len = MBytes.length buf in
|
||||
let read_len, r = read_rec ty buf 0 len in
|
||||
if read_len <> len then
|
||||
failwith "Data_encoding.Binary.of_bytes_exn: remainig data" ;
|
||||
r
|
||||
let of_bytes ty buf =
|
||||
try Some (of_bytes_exn ty buf)
|
||||
with _ -> None
|
||||
let to_bytes = to_bytes
|
||||
|
||||
let length = length
|
||||
|
@ -11,10 +11,7 @@
|
||||
use the corresponding module intended for use: {Data_encoding.Binary}. *)
|
||||
|
||||
val length : 'a Encoding.t -> 'a -> int
|
||||
val read : 'a Encoding.t -> MBytes.t -> int -> int -> (int * 'a) option
|
||||
val write : 'a Encoding.t -> 'a -> MBytes.t -> int -> int option
|
||||
val to_bytes : 'a Encoding.t -> 'a -> MBytes.t
|
||||
val of_bytes : 'a Encoding.t -> MBytes.t -> 'a option
|
||||
val of_bytes_exn : 'a Encoding.t -> MBytes.t -> 'a
|
||||
val fixed_length : 'a Encoding.t -> int option
|
||||
val fixed_length_exn : 'a Encoding.t -> int
|
||||
|
38
src/lib_data_encoding/binary_error.ml
Normal file
38
src/lib_data_encoding/binary_error.ml
Normal file
@ -0,0 +1,38 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
type read_error =
|
||||
| Not_enough_data
|
||||
| Extra_bytes
|
||||
| No_case_matched
|
||||
| Unexpected_tag of int
|
||||
| Invalid_size of int
|
||||
| Invalid_int of { min : int ; v : int ; max : int }
|
||||
| Invalid_float of { min : float ; v : float ; max : float }
|
||||
| Trailing_zero
|
||||
|
||||
let pp_read_error ppf = function
|
||||
| Not_enough_data ->
|
||||
Format.fprintf ppf "Not enough data"
|
||||
| Extra_bytes ->
|
||||
Format.fprintf ppf "Extra bytes"
|
||||
| No_case_matched ->
|
||||
Format.fprintf ppf "No case matched"
|
||||
| Unexpected_tag tag ->
|
||||
Format.fprintf ppf "Unexpected tag %d" tag
|
||||
| Invalid_size sz ->
|
||||
Format.fprintf ppf "Invalid size %d" sz
|
||||
| Invalid_int { min ; v ; max} ->
|
||||
Format.fprintf ppf "Invalid int (%d <= %d <= %d) " min v max
|
||||
| Invalid_float { min ; v ; max} ->
|
||||
Format.fprintf ppf "Invalid float (%f <= %f <= %f) " min v max
|
||||
| Trailing_zero ->
|
||||
Format.fprintf ppf "Trailing zero in Z"
|
||||
|
||||
exception Read_error of read_error
|
23
src/lib_data_encoding/binary_error.mli
Normal file
23
src/lib_data_encoding/binary_error.mli
Normal file
@ -0,0 +1,23 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** This is for use *within* the data encoding library only. Instead, you should
|
||||
use the corresponding module intended for use: {Data_encoding.Binary}. *)
|
||||
|
||||
type read_error =
|
||||
| Not_enough_data
|
||||
| Extra_bytes
|
||||
| No_case_matched
|
||||
| Unexpected_tag of int
|
||||
| Invalid_size of int
|
||||
| Invalid_int of { min : int ; v : int ; max : int }
|
||||
| Invalid_float of { min : float ; v : float ; max : float }
|
||||
| Trailing_zero
|
||||
exception Read_error of read_error
|
||||
val pp_read_error: Format.formatter -> read_error -> unit
|
285
src/lib_data_encoding/binary_reader.ml
Normal file
285
src/lib_data_encoding/binary_reader.ml
Normal file
@ -0,0 +1,285 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
open Binary_error
|
||||
|
||||
let raise e = raise (Read_error e)
|
||||
|
||||
type state = {
|
||||
buffer : MBytes.t ;
|
||||
mutable offset : int ;
|
||||
mutable remaining_bytes : int ;
|
||||
}
|
||||
|
||||
let check_remaining_bytes state size =
|
||||
if state.remaining_bytes < size then
|
||||
raise Not_enough_data ;
|
||||
state.remaining_bytes - size
|
||||
|
||||
let read_atom size conv state =
|
||||
let remaining_bytes = check_remaining_bytes state size in
|
||||
let res = conv state.buffer state.offset in
|
||||
state.offset <- state.offset + size ;
|
||||
state.remaining_bytes <- remaining_bytes ;
|
||||
res
|
||||
|
||||
(** Reader for all the atomic types. *)
|
||||
module Atom = struct
|
||||
|
||||
let uint8 = read_atom Binary_size.uint8 MBytes.get_uint8
|
||||
let uint16 = read_atom Binary_size.int16 MBytes.get_uint16
|
||||
|
||||
let int8 = read_atom Binary_size.int8 MBytes.get_int8
|
||||
let int16 = read_atom Binary_size.int16 MBytes.get_int16
|
||||
let int32 = read_atom Binary_size.int32 MBytes.get_int32
|
||||
let int64 = read_atom Binary_size.int64 MBytes.get_int64
|
||||
|
||||
let float = read_atom Binary_size.float MBytes.get_double
|
||||
|
||||
let bool state = int8 state <> 0
|
||||
|
||||
let uint30 =
|
||||
read_atom Binary_size.uint30 @@ fun buffer ofs ->
|
||||
let v = Int32.to_int (MBytes.get_int32 buffer ofs) in
|
||||
if v < 0 then
|
||||
raise (Invalid_int { min = 0 ; v ; max = (1 lsl 30) - 1 }) ;
|
||||
v
|
||||
|
||||
let int31 =
|
||||
read_atom Binary_size.int31 @@ fun buffer ofs ->
|
||||
Int32.to_int (MBytes.get_int32 buffer ofs)
|
||||
|
||||
let ranged_int ~minimum ~maximum state =
|
||||
let read_int =
|
||||
match Binary_size.range_to_size ~minimum ~maximum with
|
||||
| `Int8 -> int8
|
||||
| `Int16 -> int16
|
||||
| `Int31 -> int31
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
| `Uint30 -> uint30 in
|
||||
let ranged = read_int state in
|
||||
let ranged = if minimum > 0 then ranged + minimum else ranged in
|
||||
if not (minimum <= ranged && ranged <= maximum) then
|
||||
raise (Invalid_int { min = minimum ; v =ranged ; max = maximum }) ;
|
||||
ranged
|
||||
|
||||
let ranged_float ~minimum ~maximum state =
|
||||
let ranged = float state in
|
||||
if not (minimum <= ranged && ranged <= maximum) then
|
||||
raise (Invalid_float { min = minimum ; v = ranged ; max = maximum }) ;
|
||||
ranged
|
||||
|
||||
let z state =
|
||||
let res = Buffer.create 100 in
|
||||
let first = uint8 state in
|
||||
if first = 0 then
|
||||
Z.zero
|
||||
else
|
||||
let first_value = first land 0x3F in
|
||||
let sign = (first land 0x40) <> 0 in
|
||||
let rec read prev value bit state =
|
||||
if prev land 0x80 = 0x00 then begin
|
||||
if bit > 0 then Buffer.add_char res (Char.unsafe_chr value) ;
|
||||
if prev = 0x00 then raise Trailing_zero ;
|
||||
let bits = Buffer.contents res in
|
||||
let res = Z.of_bits bits in
|
||||
if sign then Z.neg res else res
|
||||
end else
|
||||
let byte = uint8 state in
|
||||
let value = value lor ((byte land 0x7F) lsl bit) in
|
||||
let bit = bit + 7 in
|
||||
let bit, value =
|
||||
if bit >= 8 then begin
|
||||
Buffer.add_char res (Char.unsafe_chr (value land 0xFF)) ;
|
||||
bit - 8, value lsr 8
|
||||
end else
|
||||
bit, value in
|
||||
read byte value bit state in
|
||||
read first first_value 6 state
|
||||
|
||||
let string_enum arr state =
|
||||
let read_index =
|
||||
match Binary_size.enum_size arr with
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
| `Uint30 -> uint30 in
|
||||
let index = read_index state in
|
||||
if index >= Array.length arr then
|
||||
raise No_case_matched ;
|
||||
arr.(index)
|
||||
|
||||
let fixed_length_bytes length =
|
||||
read_atom length @@ fun buf ofs ->
|
||||
MBytes.sub buf ofs length
|
||||
|
||||
let fixed_length_string length =
|
||||
read_atom length @@ fun buf ofs ->
|
||||
MBytes.sub_string buf ofs length
|
||||
|
||||
let tag = function
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
|
||||
end
|
||||
|
||||
(** Main recursive reading function, in continuation passing style. *)
|
||||
let rec read_rec : type ret. ret Encoding.t -> state -> ret
|
||||
= fun e state ->
|
||||
let open Encoding in
|
||||
match e.encoding with
|
||||
| Null -> ()
|
||||
| Empty -> ()
|
||||
| Constant _ -> ()
|
||||
| Ignore -> ()
|
||||
| Bool -> Atom.bool state
|
||||
| Int8 -> Atom.int8 state
|
||||
| Uint8 -> Atom.uint8 state
|
||||
| Int16 -> Atom.int16 state
|
||||
| Uint16 -> Atom.uint16 state
|
||||
| Int31 -> Atom.int31 state
|
||||
| Int32 -> Atom.int32 state
|
||||
| Int64 -> Atom.int64 state
|
||||
| Z -> Atom.z state
|
||||
| Float -> Atom.float state
|
||||
| Bytes (`Fixed n) -> Atom.fixed_length_bytes n state
|
||||
| Bytes `Variable ->
|
||||
Atom.fixed_length_bytes state.remaining_bytes state
|
||||
| String (`Fixed n) -> Atom.fixed_length_string n state
|
||||
| String `Variable ->
|
||||
Atom.fixed_length_string state.remaining_bytes state
|
||||
| RangedInt { minimum ; maximum } ->
|
||||
Atom.ranged_int ~minimum ~maximum state
|
||||
| RangedFloat { minimum ; maximum } ->
|
||||
Atom.ranged_float ~minimum ~maximum state
|
||||
| String_enum (_, arr) ->
|
||||
Atom.string_enum arr state
|
||||
| Array e ->
|
||||
let l = read_list e state in
|
||||
Array.of_list l
|
||||
| List e -> read_list e state
|
||||
| (Obj (Req (_, e))) -> read_rec e state
|
||||
| (Obj (Dft (_, e, _))) -> read_rec e state
|
||||
| (Obj (Opt (`Dynamic, _, e))) ->
|
||||
let present = Atom.bool state in
|
||||
if not present then
|
||||
None
|
||||
else
|
||||
Some (read_rec e state)
|
||||
| (Obj (Opt (`Variable, _, e))) ->
|
||||
if state.remaining_bytes = 0 then
|
||||
None
|
||||
else
|
||||
Some (read_rec e state)
|
||||
| Objs (`Fixed sz, e1, e2) ->
|
||||
ignore (check_remaining_bytes state sz : int) ;
|
||||
let left = read_rec e1 state in
|
||||
let right = read_rec e2 state in
|
||||
(left, right)
|
||||
| Objs (`Dynamic, e1, e2) ->
|
||||
let left = read_rec e1 state in
|
||||
let right = read_rec e2 state in
|
||||
(left, right)
|
||||
| (Objs (`Variable, e1, e2)) ->
|
||||
read_variable_pair e1 e2 state
|
||||
| Tup e -> read_rec e state
|
||||
| Tups (`Fixed sz, e1, e2) ->
|
||||
ignore (check_remaining_bytes state sz : int) ;
|
||||
let left = read_rec e1 state in
|
||||
let right = read_rec e2 state in
|
||||
(left, right)
|
||||
| Tups (`Dynamic, e1, e2) ->
|
||||
let left = read_rec e1 state in
|
||||
let right = read_rec e2 state in
|
||||
(left, right)
|
||||
| (Tups (`Variable, e1, e2)) ->
|
||||
read_variable_pair e1 e2 state
|
||||
| Conv { inj ; encoding } ->
|
||||
inj (read_rec encoding state)
|
||||
| Union (_, sz, cases) ->
|
||||
let ctag = Atom.tag sz state in
|
||||
let Case { encoding ; inj } =
|
||||
try
|
||||
List.find
|
||||
(function
|
||||
| Case { tag = Tag tag } -> tag = ctag
|
||||
| Case { tag = Json_only } -> false)
|
||||
cases
|
||||
with Not_found -> raise (Unexpected_tag ctag) in
|
||||
inj (read_rec encoding state)
|
||||
| Dynamic_size e ->
|
||||
let sz = Atom.int32 state in
|
||||
let sz = Int32.to_int sz in
|
||||
if sz < 0 then raise (Invalid_size sz) ;
|
||||
let remaining = check_remaining_bytes state sz in
|
||||
state.remaining_bytes <- sz ;
|
||||
let v = read_rec e state in
|
||||
if state.remaining_bytes <> 0 then raise Extra_bytes ;
|
||||
state.remaining_bytes <- remaining ;
|
||||
v
|
||||
| Describe { encoding = e } -> read_rec e state
|
||||
| Def { encoding = e } -> read_rec e state
|
||||
| Splitted { encoding = e } -> read_rec e state
|
||||
| Mu (_, _, self) -> read_rec (self e) state
|
||||
| Delayed f -> read_rec (f ()) state
|
||||
|
||||
|
||||
and read_variable_pair
|
||||
: type left right.
|
||||
left Encoding.t -> right Encoding.t -> state -> (left * right)
|
||||
= fun e1 e2 state ->
|
||||
match Encoding.classify e1, Encoding.classify e2 with
|
||||
| (`Dynamic | `Fixed _), `Variable ->
|
||||
let left = read_rec e1 state in
|
||||
let right = read_rec e2 state in
|
||||
(left, right)
|
||||
| `Variable, `Fixed n ->
|
||||
if n > state.remaining_bytes then raise Not_enough_data ;
|
||||
state.remaining_bytes <- state.remaining_bytes - n ;
|
||||
let left = read_rec e1 state in
|
||||
assert (state.remaining_bytes = 0) ;
|
||||
state.remaining_bytes <- n ;
|
||||
let right = read_rec e2 state in
|
||||
assert (state.remaining_bytes = 0) ;
|
||||
(left, right)
|
||||
| _ -> assert false (* Should be rejected by [Encoding.Kind.combine] *)
|
||||
|
||||
and read_list : type a. a Encoding.t -> state -> a list
|
||||
= fun e state ->
|
||||
let rec loop acc =
|
||||
if state.remaining_bytes = 0 then
|
||||
List.rev acc
|
||||
else
|
||||
let v = read_rec e state in
|
||||
loop (v :: acc) in
|
||||
loop []
|
||||
|
||||
|
||||
|
||||
(** ******************** *)
|
||||
(** Various entry points *)
|
||||
|
||||
let read encoding buffer ofs len =
|
||||
let state =
|
||||
{ buffer ; offset = ofs ; remaining_bytes = len } in
|
||||
match read_rec encoding state with
|
||||
| exception Read_error _ -> None
|
||||
| v -> Some (state.offset, v)
|
||||
|
||||
let of_bytes_exn encoding buffer =
|
||||
let len = MBytes.length buffer in
|
||||
let state =
|
||||
{ buffer ; offset = 0 ; remaining_bytes = len } in
|
||||
let v = read_rec encoding state in
|
||||
if state.offset <> len then raise Extra_bytes ;
|
||||
v
|
||||
|
||||
let of_bytes encoding buffer =
|
||||
try Some (of_bytes_exn encoding buffer)
|
||||
with Read_error _ -> None
|
15
src/lib_data_encoding/binary_reader.mli
Normal file
15
src/lib_data_encoding/binary_reader.mli
Normal file
@ -0,0 +1,15 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** This is for use *within* the data encoding library only. Instead, you should
|
||||
use the corresponding module intended for use: {Data_encoding.Binary}. *)
|
||||
|
||||
val read: 'a Encoding.t -> MBytes.t -> int -> int -> (int * 'a) option
|
||||
val of_bytes: 'a Encoding.t -> MBytes.t -> 'a option
|
||||
val of_bytes_exn: 'a Encoding.t -> MBytes.t -> 'a
|
@ -27,7 +27,6 @@ let tag_size = function
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
|
||||
|
||||
type signed_integer = [ `Int31 | `Int16 | `Int8 ]
|
||||
type unsigned_integer = [ `Uint30 | `Uint16 | `Uint8 ]
|
||||
type integer = [ signed_integer | unsigned_integer ]
|
||||
@ -62,4 +61,3 @@ let range_to_size ~minimum ~maximum : integer =
|
||||
|
||||
let enum_size arr =
|
||||
unsigned_range_to_size (Array.length arr)
|
||||
|
||||
|
@ -1,470 +0,0 @@
|
||||
|
||||
(* Facilities to decode streams of binary data *)
|
||||
|
||||
type 'a status =
|
||||
| Success of { res : 'a ; res_len : int ; remaining : MBytes.t list }
|
||||
| Await of (MBytes.t -> 'a status)
|
||||
| Error
|
||||
|
||||
(* used as a zipper to code the function read_checker with the
|
||||
ability to stop and wait for more data. In 'P_seq' case, data
|
||||
length is parameterized by the current offset. Hence, it's a
|
||||
function 'fun_data_len'. For the 'P_list' case, we store the
|
||||
base offset (before starting reading the elements) and the
|
||||
number of elements that have been read so far. *)
|
||||
type path =
|
||||
| P_top : path
|
||||
| P_await : { path : path ; encoding : 'a Encoding.t ; data_len : int } -> path
|
||||
| P_seq : { path : path ; encoding : 'a Encoding.t ;
|
||||
fun_data_len : int -> int } -> path
|
||||
| P_list : { path:path ; encoding:'a Encoding.t ; data_len : int ;
|
||||
base_ofs : int ; nb_elts_read : int } -> path
|
||||
|
||||
(* used to accumulate given mbytes when reading a list of blocks,
|
||||
as well as the current offset and the number of unread bytes *)
|
||||
type mbytes_stream = {
|
||||
past : MBytes.t Queue.t ; (* data that have been entirely read *)
|
||||
future : (MBytes.t * int) Queue.t ; (* data that are not (fully) read *)
|
||||
mutable past_len : int ; (*length of concatenation of data in 'past'*)
|
||||
mutable unread : int ; (*number of cells that are unread in 'future'*)
|
||||
ofs : int (*current absolute offset wrt to concatenation past @ future*)
|
||||
}
|
||||
|
||||
(* exception raised when additional mbytes are needed to continue
|
||||
decoding *)
|
||||
exception Need_more_data of mbytes_stream
|
||||
|
||||
(* read a data that is stored in may Mbytes *)
|
||||
let read_from_many_blocks reader buf ofs d_ofs =
|
||||
let tmp = MBytes.create d_ofs in (*we will merge data in this mbyte*)
|
||||
let r = ref d_ofs in (*to count the cells to be read*)
|
||||
let rel_ofs = ref ofs in (*= ofs for first mbyte, 0 for others*)
|
||||
while !r > 0 do
|
||||
assert (not (Queue.is_empty buf.future)) ;
|
||||
let b, len_b = Queue.peek buf.future in (*take the next mbyte*)
|
||||
let len_chunk = len_b - !rel_ofs in (*the number of cells to read*)
|
||||
if !r >= len_chunk then
|
||||
begin (*copy b in 'past' if it is read entirely*)
|
||||
ignore (Queue.pop buf.future) ;
|
||||
Queue.push b buf.past ;
|
||||
buf.past_len <- buf.past_len + len_b ;
|
||||
end ;
|
||||
(* copy (min !r len_chunk) data from b to tmp *)
|
||||
MBytes.blit b !rel_ofs tmp (d_ofs - !r) (min !r len_chunk) ;
|
||||
r := !r - len_chunk ; (* len_chunk data read during this round*)
|
||||
rel_ofs := 0 ; (*next mbytes will be read starting from zero*)
|
||||
done ;
|
||||
reader tmp 0 d_ofs
|
||||
|
||||
|
||||
(* generic function that reads data from an mbytes_stream. It is
|
||||
parameterized by a function "reader" that effectively reads the
|
||||
data *)
|
||||
let generic_read_data delta_ofs reader buf =
|
||||
let absolute_ofs = buf.ofs in
|
||||
if buf.unread < delta_ofs then (*not enough data*)
|
||||
raise (Need_more_data buf) ;
|
||||
if delta_ofs = 0 then (*we'll read nothing*)
|
||||
buf, reader (MBytes.create 0) 0 0
|
||||
else
|
||||
let new_ofs = absolute_ofs + delta_ofs in
|
||||
let ofs = absolute_ofs - buf.past_len in (*relative ofs wrt 'future'*)
|
||||
buf.unread <- buf.unread-delta_ofs ; (*'delta_ofs' cells will be read*)
|
||||
assert (not (Queue.is_empty buf.future)) ; (*we have some data to read*)
|
||||
let b, len_b = Queue.peek buf.future in
|
||||
let buf = { buf with ofs = new_ofs } in
|
||||
if ofs + delta_ofs > len_b then
|
||||
(*should read data from many mbytes*)
|
||||
buf, read_from_many_blocks reader buf ofs delta_ofs
|
||||
else
|
||||
begin
|
||||
if ofs + delta_ofs = len_b then
|
||||
begin (*the rest of b will be entirely read. Put it in 'past'*)
|
||||
ignore (Queue.pop buf.future) ;
|
||||
Queue.push b buf.past ;
|
||||
buf.past_len <- buf.past_len + len_b ;
|
||||
end ;
|
||||
buf, reader b ofs delta_ofs
|
||||
end
|
||||
|
||||
open Encoding (* open here, shadow below, use shadowed definitions later *)
|
||||
|
||||
(* functions that try to read data from a given mbytes_stream,
|
||||
or raise Need_more_data *)
|
||||
|
||||
let int8 buf =
|
||||
generic_read_data Binary_size.int8 (fun x y _ -> MBytes.get_int8 x y) buf
|
||||
|
||||
let uint8 buf =
|
||||
generic_read_data Binary_size.uint8 (fun x y _ -> MBytes.get_uint8 x y) buf
|
||||
|
||||
let char buf =
|
||||
let buf, v = int8 buf in
|
||||
buf, Char.chr v
|
||||
|
||||
let bool buf =
|
||||
let buf, v = int8 buf in
|
||||
buf, v <> 0
|
||||
|
||||
let int16 buf =
|
||||
generic_read_data Binary_size.int16 (fun x y _ -> MBytes.get_int16 x y) buf
|
||||
|
||||
let uint16 buf =
|
||||
generic_read_data Binary_size.uint16 (fun x y _ -> MBytes.get_uint16 x y) buf
|
||||
|
||||
let uint30 buf =
|
||||
generic_read_data Binary_size.uint30
|
||||
(fun x y _ ->
|
||||
let v = Int32.to_int (MBytes.get_int32 x y) in
|
||||
if v < 0 then
|
||||
failwith "Data_encoding.Binary.Reader.uint30: invalid data." ;
|
||||
v) buf
|
||||
|
||||
let int31 buf =
|
||||
generic_read_data Binary_size.int31
|
||||
(fun x y _ -> Int32.to_int (MBytes.get_int32 x y)) buf
|
||||
|
||||
let int32 buf =
|
||||
generic_read_data Binary_size.int32 (fun x y _ -> MBytes.get_int32 x y) buf
|
||||
|
||||
let int64 buf =
|
||||
generic_read_data Binary_size.int64 (fun x y _ -> MBytes.get_int64 x y) buf
|
||||
|
||||
(** read a float64 (double) **)
|
||||
let float buf =
|
||||
(*Here, float means float64, which is read using MBytes.get_double !!*)
|
||||
generic_read_data Binary_size.float (fun x y _ -> MBytes.get_double x y) buf
|
||||
|
||||
let fixed_length_bytes length buf =
|
||||
generic_read_data length MBytes.sub buf
|
||||
|
||||
let fixed_length_string length buf =
|
||||
generic_read_data length MBytes.sub_string buf
|
||||
|
||||
let read_tag = function
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
|
||||
(* auxiliary function: computing size of data in branches
|
||||
Objs(`Variable) and Tups(`Variable) *)
|
||||
let varseq_lengths e1 e2 ofs len = match Encoding.classify e1, Encoding.classify e2 with
|
||||
| (`Dynamic | `Fixed _), `Variable -> len, (fun ofs' -> len - ofs' + ofs)
|
||||
| `Variable, `Fixed n -> (len - n), (fun _ -> n)
|
||||
| _ -> assert false (* Should be rejected by Kind.combine *)
|
||||
|
||||
|
||||
(* adaptation of function read_rec to check binary data
|
||||
incrementally. The function takes (and returns) a 'path' (for
|
||||
incrementality), and 'mbytes_stream' *)
|
||||
let rec data_checker
|
||||
: type a.
|
||||
path -> a Encoding.t -> mbytes_stream -> int ->
|
||||
path * mbytes_stream =
|
||||
fun path e buf len ->
|
||||
(*length of data with `Variable kind should be given by the caller*)
|
||||
assert (Encoding.classify e != `Variable || len >= 0) ;
|
||||
try match e.encoding with
|
||||
| Null -> next_path path buf
|
||||
| Empty -> next_path path buf
|
||||
| Constant _ -> next_path path buf
|
||||
| Ignore -> next_path path { buf with ofs = buf.ofs + len }
|
||||
| Bool -> next_path path (fst (bool buf))
|
||||
| Int8 -> next_path path (fst (int8 buf))
|
||||
| Uint8 -> next_path path (fst (uint8 buf))
|
||||
| Int16 -> next_path path (fst (int16 buf))
|
||||
| Uint16 -> next_path path (fst (uint16 buf))
|
||||
| Int31 -> next_path path (fst (int31 buf))
|
||||
| Int32 -> next_path path (fst (int32 buf))
|
||||
| Int64 -> next_path path (fst (int64 buf))
|
||||
| Z ->
|
||||
let rec while_not_terminator i buf =
|
||||
let buf, byte = uint8 buf in
|
||||
if (byte land 0x80) = 0x00 then
|
||||
if byte = 0x00 && i <> 0 then
|
||||
failwith "trailing zeroes in Z encoding"
|
||||
else
|
||||
next_path path buf
|
||||
else
|
||||
while_not_terminator (i + 1) buf in
|
||||
while_not_terminator 0 buf
|
||||
| RangedInt { minimum ; maximum } ->
|
||||
let (stream, ranged) =
|
||||
match Binary_size.range_to_size ~minimum ~maximum with
|
||||
| `Int8 -> int8 buf
|
||||
| `Int16 -> int16 buf
|
||||
| `Int31 -> int31 buf
|
||||
| `Uint8 -> uint8 buf
|
||||
| `Uint16 -> uint16 buf
|
||||
| `Uint30 -> uint30 buf in
|
||||
let ranged = if minimum > 0 then ranged + minimum else ranged in
|
||||
assert (minimum <= ranged && ranged <= maximum) ;
|
||||
next_path path stream
|
||||
| Float -> next_path path (fst (float buf))
|
||||
| RangedFloat { minimum ; maximum } ->
|
||||
let stream, float = float buf in
|
||||
assert (minimum <= float && maximum >= float) ;
|
||||
next_path path stream
|
||||
| Bytes (`Fixed n) ->
|
||||
next_path path (fst (fixed_length_bytes n buf))
|
||||
|
||||
| String (`Fixed n) ->
|
||||
next_path path (fst (fixed_length_string n buf))
|
||||
|
||||
| Bytes `Variable ->
|
||||
next_path path (fst (fixed_length_bytes len buf))
|
||||
|
||||
| String `Variable ->
|
||||
next_path path (fst (fixed_length_string len buf))
|
||||
|
||||
| String_enum (_, arr) ->
|
||||
next_path path
|
||||
(match Binary_size.enum_size arr with
|
||||
| `Uint8 -> fst @@ uint8 buf
|
||||
| `Uint16 -> fst @@ uint16 buf
|
||||
| `Uint30 -> fst @@ uint30 buf)
|
||||
|
||||
| Array e ->
|
||||
let p = P_list { path ; encoding = e ; base_ofs = buf.ofs ;
|
||||
data_len = len ; nb_elts_read = 0 } in
|
||||
next_path p buf
|
||||
|
||||
| List e ->
|
||||
let p = P_list { path ; encoding = e ; base_ofs = buf.ofs ;
|
||||
data_len = len ; nb_elts_read = 0 } in
|
||||
next_path p buf
|
||||
|
||||
| Obj (Req (_, e)) -> data_checker path e buf len
|
||||
|
||||
| Obj (Opt (`Dynamic, _, e)) ->
|
||||
let buf, v = int8 buf in
|
||||
if v = 0 then next_path path buf
|
||||
else data_checker path e buf (len - Binary_size.int8)
|
||||
|
||||
| Obj (Opt (`Variable, _, e)) ->
|
||||
if len = 0 then next_path path buf
|
||||
else data_checker path e buf len
|
||||
|
||||
| Obj (Dft (_, e, _)) -> data_checker path e buf len
|
||||
|
||||
| Objs ((`Fixed _ | `Dynamic), e1, e2) ->
|
||||
let f_len2 ofs' = len - (ofs' - buf.ofs) in
|
||||
let path =
|
||||
P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in
|
||||
data_checker path e1 buf len
|
||||
|
||||
| Objs (`Variable, e1, e2) ->
|
||||
let len1, f_len2 = varseq_lengths e1 e2 buf.ofs len in
|
||||
let path =
|
||||
P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in
|
||||
data_checker path e1 buf len1
|
||||
|
||||
| Tup e -> data_checker path e buf len
|
||||
|
||||
| Tups ((`Fixed _ | `Dynamic), e1, e2) ->
|
||||
let f_len2 ofs' = len - (ofs' - buf.ofs) in
|
||||
let path =
|
||||
P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in
|
||||
data_checker path e1 buf len
|
||||
|
||||
| Tups (`Variable, e1, e2) ->
|
||||
let len1, f_len2 = varseq_lengths e1 e2 buf.ofs len in
|
||||
let path =
|
||||
P_seq { path ; encoding = e2 ; fun_data_len = f_len2 } in
|
||||
data_checker path e1 buf len1
|
||||
|
||||
| Conv { encoding = e } -> data_checker path e buf len
|
||||
|
||||
| Describe { encoding = e } -> data_checker path e buf len
|
||||
|
||||
| Def { encoding = e } -> data_checker path e buf len
|
||||
|
||||
| Splitted { encoding = e } -> data_checker path e buf len
|
||||
|
||||
| Mu (_, _, self) -> data_checker path (self e) buf len
|
||||
|
||||
| Union (_, sz, cases) ->
|
||||
let buf, ctag = read_tag sz buf in
|
||||
let opt =
|
||||
List.fold_left
|
||||
(fun acc c -> match c with
|
||||
| (Case { encoding ; tag = Tag tag })
|
||||
when tag == ctag ->
|
||||
assert (acc == None) ;
|
||||
Some (data_checker path encoding buf)
|
||||
| _ -> acc
|
||||
)None cases
|
||||
in
|
||||
begin match opt with
|
||||
| None -> raise (Encoding.Unexpected_tag ctag)
|
||||
| Some func -> func (len - (Binary_size.tag_size sz))
|
||||
end
|
||||
|
||||
| Dynamic_size e ->
|
||||
let buf, sz = int32 buf in
|
||||
let sz = Int32.to_int sz in
|
||||
if sz < 0 then raise (Encoding.Invalid_size sz) ;
|
||||
data_checker path e buf sz
|
||||
|
||||
| Delayed f -> data_checker path (f ()) buf len
|
||||
|
||||
with Need_more_data buf ->
|
||||
P_await { path ; encoding = e ; data_len = len }, buf
|
||||
|
||||
and next_path : path -> mbytes_stream -> path * mbytes_stream =
|
||||
fun path buf ->
|
||||
match path with
|
||||
| P_top ->
|
||||
P_top, buf (* success case *)
|
||||
|
||||
| P_seq { path ; encoding ; fun_data_len } ->
|
||||
(* check the right branch of a sequence. fun_data_len ofs gives
|
||||
the length of the data to read *)
|
||||
data_checker path encoding buf (fun_data_len buf.ofs)
|
||||
|
||||
| P_await { path ; encoding ; data_len } ->
|
||||
(* resume from an await *)
|
||||
data_checker path encoding buf data_len
|
||||
|
||||
| P_list
|
||||
({ path ; encoding ; base_ofs ; data_len ; nb_elts_read } as r) ->
|
||||
(* read/check an eventual element of a list *)
|
||||
if data_len = buf.ofs - base_ofs then
|
||||
(* we've read all the elements of the list *)
|
||||
next_path path buf
|
||||
else
|
||||
begin
|
||||
(*some more elements to read*)
|
||||
assert (data_len > buf.ofs - base_ofs) ;
|
||||
(*check: if we've already read some elements, then currrent ofs
|
||||
should be greater then initial ofs *)
|
||||
assert (nb_elts_read <= 0 || buf.ofs - base_ofs > 0) ;
|
||||
let path =
|
||||
P_list { r with nb_elts_read = nb_elts_read + 1} in
|
||||
data_checker path encoding buf data_len
|
||||
end
|
||||
|
||||
let data_checker = next_path
|
||||
|
||||
(* insert a given MBytes.t in a given mbytes_stream *)
|
||||
let insert_mbytes mb_buf mb =
|
||||
let len = MBytes.length mb in
|
||||
if len > 0 then begin
|
||||
Queue.push (mb, len) mb_buf.future ;
|
||||
mb_buf.unread <- mb_buf.unread + len ;
|
||||
end
|
||||
|
||||
(* aux function called when data_checker succeeds: splits a given
|
||||
mbytes_stream into a 'read' and 'unread' queues. This may
|
||||
modify the content of the given mbytes_stream *)
|
||||
let split_mbytes_stream { past_len ; past ; future ; unread ; ofs } =
|
||||
let rel_ofs = ofs - past_len in
|
||||
assert (rel_ofs >= 0) ;
|
||||
if rel_ofs = 0 then past, future (* already done *)
|
||||
else begin
|
||||
assert (not(Queue.is_empty future)) ; (*because data_checker succeeded*)
|
||||
let b, len = Queue.pop future in
|
||||
assert (rel_ofs < len) ; (*inv. maintained by read_from_many_blocks*)
|
||||
let b1 = MBytes.sub b 0 rel_ofs in (* read part of b *)
|
||||
let b2 = MBytes.sub b rel_ofs (len-rel_ofs) in (* unread part of b *)
|
||||
Queue.push b1 past ;
|
||||
|
||||
(* push b2 at the beginning of 'future' using Queue.transfer*)
|
||||
let tmp = Queue.create() in
|
||||
Queue.push (b2, unread) tmp ;
|
||||
Queue.transfer future tmp ; (*tmp === b2 ::: future in constant time*)
|
||||
past, tmp
|
||||
end
|
||||
|
||||
(* given a state, this function returns a new status:
|
||||
- if data are successfully checked, accumulated mbytes are
|
||||
passed to 'success_result' that computes the final
|
||||
result. Unread mbytes are also returned
|
||||
- if some more data are needed, a function that waits for some
|
||||
additional mbytes is returned
|
||||
- eventual errors are reported/returned *)
|
||||
let rec bytes_stream_reader_rec (path, mb_buf) success_result =
|
||||
let success =
|
||||
match path with
|
||||
| P_top -> true
|
||||
| P_await _ -> false
|
||||
| _ -> assert false
|
||||
in
|
||||
assert (mb_buf.ofs >= mb_buf.past_len) ;
|
||||
if success then
|
||||
let q_read, q_unread = split_mbytes_stream mb_buf in
|
||||
match success_result q_read mb_buf.ofs with
|
||||
| Some a ->
|
||||
let remaining =
|
||||
List.rev @@
|
||||
Queue.fold
|
||||
(fun acc (b, len) ->
|
||||
if len = 0 then acc else b:: acc) [] q_unread
|
||||
in
|
||||
Success { res = a ; res_len = mb_buf.ofs ; remaining }
|
||||
| None -> Error
|
||||
(* success_result may fail because data_checker is
|
||||
approximative in some situations *)
|
||||
else
|
||||
Await
|
||||
(fun mb ->
|
||||
insert_mbytes mb_buf mb ;
|
||||
try
|
||||
let state = data_checker path mb_buf in
|
||||
bytes_stream_reader_rec state success_result
|
||||
with _ -> Error)
|
||||
|
||||
(* This function checks reading a stream of 'MBytes.t' wrt. a given
|
||||
encoding:
|
||||
- the given data encoding should have a 'Fixed' or a 'Dynamic'
|
||||
size, otherwise an error is returned,
|
||||
- the function returns an 'Error', a function w
|
||||
('Await w') that waits for more data (Mbytes.t), or
|
||||
'Success'. The function is parameterized by 'success_result'
|
||||
that computes the data to return in case of success.
|
||||
An exception 'Invalid_argument "streaming data with variable
|
||||
size"' is raised if the encoding has a variable size *)
|
||||
let bytes_stream_reader :
|
||||
MBytes.t list -> 'a t ->
|
||||
(MBytes.t Queue.t -> int -> 'b option) -> 'b status
|
||||
= fun l e success_result ->
|
||||
match classify e with
|
||||
| `Variable -> invalid_arg "streaming data with variable size"
|
||||
| `Fixed _ | `Dynamic ->
|
||||
let mb_buf = {
|
||||
past = Queue.create() ; past_len = 0 ;
|
||||
future = Queue.create() ; unread = 0; ofs = 0 }
|
||||
in
|
||||
List.iter (insert_mbytes mb_buf) l ;
|
||||
let path =
|
||||
P_await { path = P_top ; encoding = e ; data_len = - 1 } in
|
||||
try bytes_stream_reader_rec (data_checker path mb_buf) success_result
|
||||
with _ -> Error
|
||||
|
||||
(* concats a queue of mbytes into one MByte *)
|
||||
let concat_mbyte_chunks queue tot_len =
|
||||
if Queue.length queue = 1 then Queue.pop queue (* no copy *)
|
||||
else (* copy smaller mbytes into one big mbyte *)
|
||||
let buf = MBytes.create tot_len in
|
||||
let cpt = ref 0 in
|
||||
let tot_len' = ref tot_len in
|
||||
while not (Queue.is_empty queue) do
|
||||
let mb = Queue.pop queue in
|
||||
let len = MBytes.length mb in
|
||||
tot_len' := !tot_len' - len ;
|
||||
assert (!tot_len' >= 0) ;
|
||||
MBytes.blit mb 0 buf !cpt len ;
|
||||
cpt := !cpt + len ;
|
||||
done ;
|
||||
assert (!tot_len' = 0) ;
|
||||
buf
|
||||
|
||||
(* Decode a stream of MBytes. see
|
||||
Stream_reader.bytes_stream_traversal for more details *)
|
||||
let read_stream_of_bytes ?(init=[]) encoding =
|
||||
bytes_stream_reader init encoding
|
||||
(fun read_q ofs -> Binary.of_bytes encoding (concat_mbyte_chunks read_q ofs))
|
||||
|
||||
(* Check reading a stream of MBytes. see
|
||||
Stream_reader.bytes_stream_traversal for more details *)
|
||||
let check_stream_of_bytes ?(init=[]) encoding =
|
||||
bytes_stream_reader init encoding (fun _ _ -> Some ())
|
367
src/lib_data_encoding/binary_stream_reader.ml
Normal file
367
src/lib_data_encoding/binary_stream_reader.ml
Normal file
@ -0,0 +1,367 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
open Binary_error
|
||||
|
||||
let raise e = raise (Read_error e)
|
||||
|
||||
(** Persistent state of the binary reader. *)
|
||||
type state = {
|
||||
|
||||
stream : Binary_stream.t ;
|
||||
(** All the remaining data to be read. *)
|
||||
|
||||
remaining_bytes : int option ;
|
||||
(** Total number of bytes that should be from 'stream' (None =
|
||||
illimited). Reading less bytes should raise [Extra_bytes] and
|
||||
trying to read more bytes should raise [Not_enough_data]. *)
|
||||
|
||||
total_read : int ;
|
||||
(** Total number of bytes that has been read from [stream] since the
|
||||
beginning. *)
|
||||
|
||||
}
|
||||
|
||||
(** Return type for the function [read_rec]. See [Data_encoding] for its
|
||||
description. *)
|
||||
type 'ret status =
|
||||
| Success of { result : 'ret ; size : int ; stream : Binary_stream.t }
|
||||
| Await of (MBytes.t -> 'ret status)
|
||||
| Error of read_error
|
||||
|
||||
let check_remaining_bytes state size =
|
||||
match state.remaining_bytes with
|
||||
| Some len when len < size -> raise Not_enough_data
|
||||
| Some len -> Some (len - size)
|
||||
| None -> None
|
||||
|
||||
(** [read_atom resume size conv state k] reads [size] bytes from [state],
|
||||
pass it to [conv] to be decoded, and finally call the continuation [k]
|
||||
with the decoded value and the updated state.
|
||||
|
||||
The function [conv] is also allowed to raise [Read_error err].
|
||||
In that case the exception is catched and [Error err] is returned.
|
||||
|
||||
If there is not enough [remaining_bytes] to be read in [state], the
|
||||
function returns [Error Not_enough_data] instead of calling
|
||||
the continuation.
|
||||
|
||||
If there is not enough [allowed_bytes] to be read in [state], the
|
||||
function returns [Error Size_limit_exceeded] instead of calling
|
||||
the continuation.
|
||||
|
||||
If there is not enough bytes to be read in [state], the function
|
||||
returns [Await resume] instead of calling the continuation. *)
|
||||
let read_atom resume size conv state k =
|
||||
match
|
||||
let remaining_bytes = check_remaining_bytes state size in
|
||||
let res, stream = Binary_stream.read state.stream size in
|
||||
conv res.buffer res.ofs,
|
||||
{ remaining_bytes ; stream ;
|
||||
total_read = state.total_read + size }
|
||||
with
|
||||
| exception (Read_error error) -> Error error
|
||||
| exception Binary_stream.Need_more_data -> Await resume
|
||||
| v -> k v (* tail call *)
|
||||
|
||||
(** Reader for all the atomic types. *)
|
||||
module Atom = struct
|
||||
|
||||
let uint8 r = read_atom r Binary_size.uint8 MBytes.get_uint8
|
||||
let uint16 r = read_atom r Binary_size.int16 MBytes.get_uint16
|
||||
|
||||
let int8 r = read_atom r Binary_size.int8 MBytes.get_int8
|
||||
let int16 r = read_atom r Binary_size.int16 MBytes.get_int16
|
||||
let int32 r = read_atom r Binary_size.int32 MBytes.get_int32
|
||||
let int64 r = read_atom r Binary_size.int64 MBytes.get_int64
|
||||
|
||||
let float r = read_atom r Binary_size.float MBytes.get_double
|
||||
|
||||
let bool resume state k =
|
||||
int8 resume state @@ fun (v, state) ->
|
||||
k (v <> 0, state)
|
||||
|
||||
let uint30 r =
|
||||
read_atom r Binary_size.uint30 @@ fun buffer ofs ->
|
||||
let v = Int32.to_int (MBytes.get_int32 buffer ofs) in
|
||||
if v < 0 then
|
||||
raise (Invalid_int { min = 0 ; v ; max = (1 lsl 30) - 1 }) ;
|
||||
v
|
||||
|
||||
let int31 r =
|
||||
read_atom r Binary_size.int31 @@ fun buffer ofs ->
|
||||
Int32.to_int (MBytes.get_int32 buffer ofs)
|
||||
|
||||
let ranged_int ~minimum ~maximum resume state k =
|
||||
let read_int =
|
||||
match Binary_size.range_to_size ~minimum ~maximum with
|
||||
| `Int8 -> int8
|
||||
| `Int16 -> int16
|
||||
| `Int31 -> int31
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
| `Uint30 -> uint30 in
|
||||
read_int resume state @@ fun (ranged, state) ->
|
||||
let ranged = if minimum > 0 then ranged + minimum else ranged in
|
||||
if not (minimum <= ranged && ranged <= maximum) then
|
||||
Error (Invalid_int { min = minimum ; v =ranged ; max = maximum })
|
||||
else
|
||||
k (ranged, state)
|
||||
|
||||
let ranged_float ~minimum ~maximum resume state k =
|
||||
float resume state @@ fun (ranged, state) ->
|
||||
if not (minimum <= ranged && ranged <= maximum) then
|
||||
Error (Invalid_float { min = minimum ; v = ranged ; max = maximum })
|
||||
else
|
||||
k (ranged, state)
|
||||
|
||||
let z resume state k =
|
||||
let res = Buffer.create 100 in
|
||||
uint8 resume state @@ fun (first, state) ->
|
||||
if first = 0 then
|
||||
k (Z.zero, state)
|
||||
else
|
||||
let first_value = first land 0x3F in
|
||||
let sign = (first land 0x40) <> 0 in
|
||||
let rec read prev value bit state =
|
||||
if prev land 0x80 = 0x00 then begin
|
||||
if bit > 0 then Buffer.add_char res (Char.unsafe_chr value) ;
|
||||
if prev = 0x00 then raise Trailing_zero ;
|
||||
let bits = Buffer.contents res in
|
||||
let res = Z.of_bits bits in
|
||||
let res = if sign then Z.neg res else res in
|
||||
k (res, state)
|
||||
end else
|
||||
let resume buffer =
|
||||
let stream = Binary_stream.push buffer state.stream in
|
||||
uint8 resume { state with stream } (read_next value bit) in
|
||||
uint8 resume state (read_next value bit)
|
||||
and read_next value bit (byte, state) =
|
||||
let value = value lor ((byte land 0x7F) lsl bit) in
|
||||
let bit = bit + 7 in
|
||||
let bit, value =
|
||||
if bit >= 8 then begin
|
||||
Buffer.add_char res (Char.unsafe_chr (value land 0xFF)) ;
|
||||
bit - 8, value lsr 8
|
||||
end else
|
||||
bit, value in
|
||||
read byte value bit state in
|
||||
read first first_value 6 state
|
||||
|
||||
let string_enum arr resume state k =
|
||||
let read_index =
|
||||
match Binary_size.enum_size arr with
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
| `Uint30 -> uint30 in
|
||||
read_index resume state @@ fun (index, state) ->
|
||||
if index >= Array.length arr then
|
||||
Error No_case_matched
|
||||
else
|
||||
k (arr.(index), state)
|
||||
|
||||
let fixed_length_bytes length r =
|
||||
read_atom r length @@ fun buf ofs ->
|
||||
MBytes.sub buf ofs length
|
||||
|
||||
let fixed_length_string length r =
|
||||
read_atom r length @@ fun buf ofs ->
|
||||
MBytes.sub_string buf ofs length
|
||||
|
||||
let tag = function
|
||||
| `Uint8 -> uint8
|
||||
| `Uint16 -> uint16
|
||||
|
||||
end
|
||||
|
||||
(** Main recursive reading function, in continuation passing style. *)
|
||||
let rec read_rec
|
||||
: type next ret.
|
||||
next Encoding.t -> state -> ((next * state) -> ret status) -> ret status
|
||||
= fun e state k ->
|
||||
let resume buffer =
|
||||
let stream = Binary_stream.push buffer state.stream in
|
||||
try read_rec e { state with stream }k
|
||||
with Read_error err -> Error err in
|
||||
let open Encoding in
|
||||
assert (Encoding.classify e <> `Variable || state.remaining_bytes <> None) ;
|
||||
match e.encoding with
|
||||
| Null -> k ((), state)
|
||||
| Empty -> k ((), state)
|
||||
| Constant _ -> k ((), state)
|
||||
| Ignore -> k ((), state)
|
||||
| Bool -> Atom.bool resume state k
|
||||
| Int8 -> Atom.int8 resume state k
|
||||
| Uint8 -> Atom.uint8 resume state k
|
||||
| Int16 -> Atom.int16 resume state k
|
||||
| Uint16 -> Atom.uint16 resume state k
|
||||
| Int31 -> Atom.int31 resume state k
|
||||
| Int32 -> Atom.int32 resume state k
|
||||
| Int64 -> Atom.int64 resume state k
|
||||
| Z -> Atom.z resume state k
|
||||
| Float -> Atom.float resume state k
|
||||
| Bytes (`Fixed n) -> Atom.fixed_length_bytes n resume state k
|
||||
| Bytes `Variable ->
|
||||
let size = remaining_bytes state in
|
||||
Atom.fixed_length_bytes size resume state k
|
||||
| String (`Fixed n) -> Atom.fixed_length_string n resume state k
|
||||
| String `Variable ->
|
||||
let size = remaining_bytes state in
|
||||
Atom.fixed_length_string size resume state k
|
||||
| RangedInt { minimum ; maximum } ->
|
||||
Atom.ranged_int ~minimum ~maximum resume state k
|
||||
| RangedFloat { minimum ; maximum } ->
|
||||
Atom.ranged_float ~minimum ~maximum resume state k
|
||||
| String_enum (_, arr) ->
|
||||
Atom.string_enum arr resume state k
|
||||
| Array e ->
|
||||
read_list e state @@ fun (l, state) ->
|
||||
k (Array.of_list l, state)
|
||||
| List e -> read_list e state k
|
||||
| (Obj (Req (_, e))) -> read_rec e state k
|
||||
| (Obj (Dft (_, e, _))) -> read_rec e state k
|
||||
| (Obj (Opt (`Dynamic, _, e))) ->
|
||||
Atom.bool resume state @@ fun (present, state) ->
|
||||
if not present then
|
||||
k (None, state)
|
||||
else
|
||||
read_rec e state @@ fun (v, state) ->
|
||||
k (Some v, state)
|
||||
| (Obj (Opt (`Variable, _, e))) ->
|
||||
let size = remaining_bytes state in
|
||||
if size = 0 then
|
||||
k (None, state)
|
||||
else
|
||||
read_rec e state @@ fun (v, state) ->
|
||||
k (Some v, state)
|
||||
| Objs (`Fixed sz, e1, e2) ->
|
||||
ignore (check_remaining_bytes state sz : int option) ;
|
||||
read_rec e1 state @@ fun (left, state) ->
|
||||
read_rec e2 state @@ fun (right, state) ->
|
||||
k ((left, right), state)
|
||||
| Objs (`Dynamic, e1, e2) ->
|
||||
read_rec e1 state @@ fun (left, state) ->
|
||||
read_rec e2 state @@ fun (right, state) ->
|
||||
k ((left, right), state)
|
||||
| (Objs (`Variable, e1, e2)) ->
|
||||
read_variable_pair e1 e2 state k
|
||||
| Tup e -> read_rec e state k
|
||||
| Tups (`Fixed sz, e1, e2) ->
|
||||
ignore (check_remaining_bytes state sz : int option) ;
|
||||
read_rec e1 state @@ fun (left, state) ->
|
||||
read_rec e2 state @@ fun (right, state) ->
|
||||
k ((left, right), state)
|
||||
| Tups (`Dynamic, e1, e2) ->
|
||||
read_rec e1 state @@ fun (left, state) ->
|
||||
read_rec e2 state @@ fun (right, state) ->
|
||||
k ((left, right), state)
|
||||
| (Tups (`Variable, e1, e2)) ->
|
||||
read_variable_pair e1 e2 state k
|
||||
| Conv { inj ; encoding } ->
|
||||
read_rec encoding state @@ fun (v, state) ->
|
||||
k (inj v, state)
|
||||
| Union (_, sz, cases) -> begin
|
||||
Atom.tag sz resume state @@ fun (ctag, state) ->
|
||||
match
|
||||
List.find
|
||||
(function
|
||||
| Case { tag = Tag tag } -> tag = ctag
|
||||
| Case { tag = Json_only } -> false)
|
||||
cases
|
||||
with
|
||||
| exception Not_found -> Error (Unexpected_tag ctag)
|
||||
| Case { encoding ; inj } ->
|
||||
read_rec encoding state @@ fun (v, state) ->
|
||||
k (inj v, state)
|
||||
end
|
||||
| Dynamic_size e ->
|
||||
Atom.int32 resume state @@ fun (sz, state) ->
|
||||
let sz = Int32.to_int sz in
|
||||
if sz < 0 then
|
||||
Error (Invalid_size sz)
|
||||
else
|
||||
let remaining = check_remaining_bytes state sz in
|
||||
let state = { state with remaining_bytes = Some sz } in
|
||||
read_rec e state @@ fun (v, state) ->
|
||||
if state.remaining_bytes <> Some 0 then
|
||||
Error Extra_bytes
|
||||
else
|
||||
k (v, { state with remaining_bytes = remaining })
|
||||
| Describe { encoding = e } -> read_rec e state k
|
||||
| Def { encoding = e } -> read_rec e state k
|
||||
| Splitted { encoding = e } -> read_rec e state k
|
||||
| Mu (_, _, self) -> read_rec (self e) state k
|
||||
| Delayed f -> read_rec (f ()) state k
|
||||
|
||||
and remaining_bytes { remaining_bytes } =
|
||||
match remaining_bytes with
|
||||
| None ->
|
||||
(* This function should only be called with a variable encoding,
|
||||
for which the `remaining_bytes` should never be `None`. *)
|
||||
assert false
|
||||
| Some len -> len
|
||||
|
||||
and read_variable_pair
|
||||
: type left right ret.
|
||||
left Encoding.t -> right Encoding.t -> state ->
|
||||
(((left * right) * state) -> ret status) -> ret status
|
||||
= fun e1 e2 state k ->
|
||||
let size = remaining_bytes state in
|
||||
match Encoding.classify e1, Encoding.classify e2 with
|
||||
| (`Dynamic | `Fixed _), `Variable ->
|
||||
read_rec e1 state @@ fun (left, state) ->
|
||||
read_rec e2 state @@ fun (right, state) ->
|
||||
k ((left, right), state)
|
||||
| `Variable, `Fixed n ->
|
||||
if n > size then
|
||||
Error Not_enough_data
|
||||
else
|
||||
let state = { state with remaining_bytes = Some (size - n) } in
|
||||
read_rec e1 state @@ fun (left, state) ->
|
||||
assert (state.remaining_bytes = Some 0) ;
|
||||
let state = { state with remaining_bytes = Some n } in
|
||||
read_rec e2 state @@ fun (right, state) ->
|
||||
assert (state.remaining_bytes = Some 0) ;
|
||||
k ((left, right), state)
|
||||
| _ -> assert false (* Should be rejected by [Encoding.Kind.combine] *)
|
||||
|
||||
and read_list
|
||||
: type a ret.
|
||||
a Encoding.t -> state -> ((a list * state) -> ret status) -> ret status
|
||||
= fun e state k ->
|
||||
let rec loop state acc =
|
||||
let size = remaining_bytes state in
|
||||
if size = 0 then
|
||||
k (List.rev acc, state)
|
||||
else
|
||||
read_rec e state @@ fun (v, state) ->
|
||||
loop state (v :: acc) in
|
||||
loop state []
|
||||
|
||||
let read_rec e state k =
|
||||
try read_rec e state k
|
||||
with Read_error err -> Error err
|
||||
|
||||
|
||||
|
||||
(** ******************** *)
|
||||
(** Various entry points *)
|
||||
|
||||
let success (v, state) =
|
||||
Success { result = v ; size = state.total_read ; stream = state.stream }
|
||||
|
||||
let read_stream ?(init = Binary_stream.empty) encoding =
|
||||
match Encoding.classify encoding with
|
||||
| `Variable ->
|
||||
invalid_arg "Data_encoding.Binary.read_stream: variable encoding"
|
||||
| `Dynamic | `Fixed _ ->
|
||||
(* No hardcoded read limit in a stream. *)
|
||||
let state = { remaining_bytes = None ;
|
||||
stream = init ; total_read = 0 } in
|
||||
read_rec encoding state success
|
18
src/lib_data_encoding/binary_stream_reader.mli
Normal file
18
src/lib_data_encoding/binary_stream_reader.mli
Normal file
@ -0,0 +1,18 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** This is for use *within* the data encoding library only. Instead, you should
|
||||
use the corresponding module intended for use: {Data_encoding.Binary}. *)
|
||||
|
||||
type 'ret status =
|
||||
| Success of { result : 'ret ; size : int ; stream : Binary_stream.t }
|
||||
| Await of (MBytes.t -> 'ret status)
|
||||
| Error of Binary_error.read_error
|
||||
|
||||
val read_stream: ?init:Binary_stream.t -> 'a Encoding.t -> 'a status
|
@ -23,7 +23,9 @@ module Json = Json
|
||||
module Bson = Bson
|
||||
module Binary = struct
|
||||
include Binary
|
||||
include Binary_stream
|
||||
include Binary_error
|
||||
include Binary_reader
|
||||
include Binary_stream_reader
|
||||
end
|
||||
|
||||
type json = Json.t
|
||||
|
@ -534,29 +534,25 @@ module Binary: sig
|
||||
val of_bytes : 'a Encoding.t -> MBytes.t -> 'a option
|
||||
val of_bytes_exn : 'a Encoding.t -> MBytes.t -> 'a
|
||||
|
||||
(** This type is used when decoding binary data incrementally.
|
||||
- In case of 'Success', the decoded data, the size of used data
|
||||
to decode the result, and the remaining data are returned
|
||||
- In case of error, 'Error' is returned
|
||||
- 'Await' status embeds a function that waits for additional data
|
||||
to continue decoding, when given data are not sufficient *)
|
||||
type 'a status =
|
||||
| Success of { res : 'a ; res_len : int ; remaining : MBytes.t list }
|
||||
| Await of (MBytes.t -> 'a status)
|
||||
| Error
|
||||
type read_error =
|
||||
| Not_enough_data
|
||||
| Extra_bytes
|
||||
| No_case_matched
|
||||
| Unexpected_tag of int
|
||||
| Invalid_size of int
|
||||
| Invalid_int of { min : int ; v : int ; max : int }
|
||||
| Invalid_float of { min : float ; v : float ; max : float }
|
||||
| Trailing_zero
|
||||
exception Read_error of read_error
|
||||
val pp_read_error: Format.formatter -> read_error -> unit
|
||||
|
||||
(** This function allows to decode (or to initialize decoding) a
|
||||
stream of 'MByte.t'. The given data encoding should have a
|
||||
'Fixed' or a 'Dynamic' size, otherwise an exception
|
||||
'Invalid_argument "streaming data with variable size"' is
|
||||
raised *)
|
||||
val read_stream_of_bytes : ?init:MBytes.t list -> 'a Encoding.t -> 'a status
|
||||
type 'ret status =
|
||||
| Success of { result : 'ret ; size : int ; stream : Binary_stream.t }
|
||||
| Await of (MBytes.t -> 'ret status)
|
||||
| Error of read_error
|
||||
|
||||
val read_stream: ?init:Binary_stream.t -> 'a Encoding.t -> 'a status
|
||||
|
||||
(** Like read_stream_of_bytes, but only checks that the stream can
|
||||
be read. Note that this is an approximation because failures
|
||||
that may come from conversion functions present in encodings are
|
||||
not checked *)
|
||||
val check_stream_of_bytes : ?init:MBytes.t list -> 'a Encoding.t -> unit status
|
||||
val fixed_length : 'a Encoding.t -> int option
|
||||
val fixed_length_exn : 'a Encoding.t -> int
|
||||
|
||||
|
@ -22,10 +22,10 @@ let no_exception f =
|
||||
Alcotest.failf
|
||||
"@[v 2>json failed:@ %a@]"
|
||||
(fun ppf -> Json_encoding.print_error ppf) exn
|
||||
| exn ->
|
||||
| Binary.Read_error error ->
|
||||
Alcotest.failf
|
||||
"@[v 2>unexpected exception:@ %s@]"
|
||||
(Printexc.to_string exn)
|
||||
"@[v 2>bytes reading failed:@ %a@]"
|
||||
Binary.pp_read_error error
|
||||
|
||||
let check_raises expected f =
|
||||
match f () with
|
||||
@ -40,14 +40,13 @@ let chunked_read sz encoding bytes =
|
||||
(fun status chunk ->
|
||||
match status with
|
||||
| Binary.Await f -> f chunk
|
||||
| Success _ when MBytes.length chunk <> 0 -> Error
|
||||
| Success _ | Error -> status)
|
||||
(Binary.read_stream_of_bytes encoding)
|
||||
| Success _ when MBytes.length chunk <> 0 -> Error Extra_bytes
|
||||
| Success _ | Error _ -> status)
|
||||
(Binary.read_stream encoding)
|
||||
(MBytes.cut sz bytes) in
|
||||
match status with
|
||||
| Success { remaining ; _ } when
|
||||
List.exists (fun b -> MBytes.length b <> 0) remaining ->
|
||||
Binary.Error
|
||||
| Success { stream ; _ } when not (Binary_stream.is_empty stream) ->
|
||||
Binary.Error Extra_bytes
|
||||
| _ -> status
|
||||
|
||||
let streamed_read encoding bytes =
|
||||
@ -55,6 +54,6 @@ let streamed_read encoding bytes =
|
||||
(fun (status, count as acc) chunk ->
|
||||
match status with
|
||||
| Binary.Await f -> (f chunk, succ count)
|
||||
| Success _ | Error -> acc)
|
||||
(Binary.read_stream_of_bytes encoding, 0)
|
||||
| Success _ | Error _ -> acc)
|
||||
(Binary.read_stream encoding, 0)
|
||||
(MBytes.cut 1 bytes)
|
||||
|
@ -14,19 +14,19 @@ open Helpers
|
||||
open Types
|
||||
|
||||
let not_enough_data = function
|
||||
| Invalid_argument _ -> true
|
||||
| Binary.Read_error Not_enough_data -> true
|
||||
| _ -> false
|
||||
|
||||
let extra_bytes = function
|
||||
| Failure _ -> true
|
||||
| Binary.Read_error Extra_bytes -> true
|
||||
| _ -> false
|
||||
|
||||
let trailing_zero = function
|
||||
| Failure _ -> true
|
||||
| Binary.Read_error Trailing_zero -> true
|
||||
| _ -> false
|
||||
|
||||
let invalid_int = function
|
||||
| Data_encoding.Int_out_of_range _ -> true
|
||||
| Binary.Read_error (Invalid_int _) -> true
|
||||
| Json_encoding.Cannot_destruct ([] , Failure _) -> true
|
||||
| _ -> false
|
||||
|
||||
@ -35,17 +35,17 @@ let invalid_string_length = function
|
||||
([], Json_encoding.Unexpected ("string (len 9)", "string (len 4)")) -> true
|
||||
| Json_encoding.Cannot_destruct
|
||||
([], Json_encoding.Unexpected ("bytes (len 9)", "bytes (len 4)")) -> true
|
||||
| Failure _ -> true
|
||||
| Binary.Read_error Extra_bytes -> true
|
||||
| _ -> false
|
||||
|
||||
let missing_case = function
|
||||
| Json_encoding.Cannot_destruct ([], Json_encoding.No_case_matched _ ) -> true
|
||||
| Unexpected_tag _ -> true
|
||||
| Binary.Read_error (Unexpected_tag _) -> true
|
||||
| _ -> false
|
||||
|
||||
let missing_enum = function
|
||||
| Json_encoding.Cannot_destruct ([], Json_encoding.Unexpected _ ) -> true
|
||||
| No_case_matched -> true
|
||||
| Binary.Read_error No_case_matched -> true
|
||||
| _ -> false
|
||||
|
||||
let json ?(expected = fun _ -> true) read_encoding json () =
|
||||
@ -63,7 +63,7 @@ let binary ?(expected = fun _ -> true) read_encoding bytes () =
|
||||
ignore (Binary.of_bytes_exn read_encoding bytes) ;
|
||||
end
|
||||
|
||||
let stream read_encoding bytes () =
|
||||
let stream ?(expected = fun _ -> true) read_encoding bytes () =
|
||||
let len_data = MBytes.length bytes in
|
||||
for sz = 1 to max 1 len_data do
|
||||
let name = Format.asprintf "stream (%d)" sz in
|
||||
@ -72,7 +72,13 @@ let stream read_encoding bytes () =
|
||||
Alcotest.failf "%s failed: expecting exception, got success." name
|
||||
| Binary.Await _ ->
|
||||
Alcotest.failf "%s failed: not enough data" name
|
||||
| Error -> ()
|
||||
| Binary.Error error when expected (Binary.Read_error error) ->
|
||||
()
|
||||
| Binary.Error error ->
|
||||
Alcotest.failf
|
||||
"@[<v 2>%s failed: read error@ %a@]"
|
||||
name
|
||||
Binary.pp_read_error error
|
||||
done
|
||||
|
||||
let all ?expected name write_encoding read_encoding value =
|
||||
@ -82,7 +88,7 @@ let all ?expected name write_encoding read_encoding value =
|
||||
[ name ^ ".json", `Quick, json ?expected read_encoding json_value ;
|
||||
name ^ ".bson", `Quick, bson ?expected read_encoding bson_value ;
|
||||
name ^ ".bytes", `Quick, binary ?expected read_encoding bytes_value ;
|
||||
name ^ ".stream", `Quick, stream read_encoding bytes_value ;
|
||||
name ^ ".stream", `Quick, stream ?expected read_encoding bytes_value ;
|
||||
]
|
||||
|
||||
let all_ranged_int minimum maximum =
|
||||
|
@ -48,15 +48,18 @@ let stream ty encoding value () =
|
||||
for sz = 1 to max 1 len_data do
|
||||
let name = Format.asprintf "stream (%d)" sz in
|
||||
match chunked_read sz encoding bytes with
|
||||
| Binary.Success { res = result ; res_len = size ; remaining } ->
|
||||
| Binary.Success { result ; size ; stream } ->
|
||||
if size <> MBytes.length bytes ||
|
||||
List.exists (fun b -> MBytes.length b <> 0) remaining then
|
||||
not (Binary_stream.is_empty stream) then
|
||||
Alcotest.failf "%s failed: remaining data" name ;
|
||||
Alcotest.check ty name value result
|
||||
| Binary.Await _ ->
|
||||
Alcotest.failf "%s failed: not enough data" name
|
||||
| Binary.Error ->
|
||||
Alcotest.failf "@[<v 2>%s failed: read error@]" name
|
||||
| Binary.Error error ->
|
||||
Alcotest.failf
|
||||
"@[<v 2>%s failed: read error@ %a@]"
|
||||
name
|
||||
Binary.pp_read_error error
|
||||
done ;
|
||||
end
|
||||
|
||||
@ -97,10 +100,12 @@ let all_ranged_float minimum maximum =
|
||||
all (name ^ ".max") Alcotest.float encoding maximum
|
||||
|
||||
let test_z_sequence () =
|
||||
let test i = binary Alcotest.z z i () in
|
||||
for i = -1_00_000 to 1_00_000 do test (Z.of_int i) done ;
|
||||
for i = 100_000_000 to 100_100_000 do test (Z.of_int i) done ;
|
||||
for i = -100_000_000 downto -100_100_000 do test (Z.of_int i) done
|
||||
let test i =
|
||||
binary Alcotest.z z i () ;
|
||||
stream Alcotest.z z i () in
|
||||
for i = -10_000 to 10_000 do test (Z.of_int i) done ;
|
||||
for i = 100_000_000 to 100_010_000 do test (Z.of_int i) done ;
|
||||
for i = -100_000_000 downto -100_010_000 do test (Z.of_int i) done
|
||||
|
||||
let test_string_enum_boundary () =
|
||||
let entries = List.rev_map (fun x -> string_of_int x, x) (0 -- 254) in
|
||||
|
@ -233,14 +233,14 @@ module Reader = struct
|
||||
mutable worker: unit Lwt.t ;
|
||||
}
|
||||
|
||||
let read_message st init_mbytes =
|
||||
let read_message st init =
|
||||
let rec loop status =
|
||||
Lwt_unix.yield () >>= fun () ->
|
||||
let open Data_encoding.Binary in
|
||||
match status with
|
||||
| Success { res ; res_len ; remaining } ->
|
||||
return (Some (res, res_len, remaining))
|
||||
| Error ->
|
||||
| Success { result ; size ; stream } ->
|
||||
return (Some (result, size, stream))
|
||||
| Error _ ->
|
||||
lwt_debug "[read_message] incremental decoding error" >>= fun () ->
|
||||
return None
|
||||
| Await decode_next_buf ->
|
||||
@ -251,27 +251,26 @@ module Reader = struct
|
||||
"reading %d bytes from %a"
|
||||
(MBytes.length buf) P2p_connection.Info.pp st.conn.info >>= fun () ->
|
||||
loop (decode_next_buf buf) in
|
||||
loop
|
||||
(Data_encoding.Binary.read_stream_of_bytes ~init:init_mbytes st.encoding)
|
||||
loop (Data_encoding.Binary.read_stream ?init st.encoding)
|
||||
|
||||
|
||||
let rec worker_loop st init_mbytes =
|
||||
let rec worker_loop st stream =
|
||||
begin
|
||||
read_message st init_mbytes >>=? fun msg ->
|
||||
read_message st stream >>=? fun msg ->
|
||||
match msg with
|
||||
| None ->
|
||||
protect ~canceler:st.canceler begin fun () ->
|
||||
Lwt_pipe.push st.messages (Error [P2p_errors.Decoding_error]) >>= fun () ->
|
||||
return None
|
||||
end
|
||||
| Some (msg, size, rem_mbytes) ->
|
||||
| Some (msg, size, stream) ->
|
||||
protect ~canceler:st.canceler begin fun () ->
|
||||
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
||||
return (Some rem_mbytes)
|
||||
return (Some stream)
|
||||
end
|
||||
end >>= function
|
||||
| Ok Some rem_mbytes ->
|
||||
worker_loop st rem_mbytes
|
||||
| Ok (Some stream) ->
|
||||
worker_loop st (Some stream)
|
||||
| Ok None ->
|
||||
Lwt_canceler.cancel st.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
@ -301,7 +300,7 @@ module Reader = struct
|
||||
end ;
|
||||
st.worker <-
|
||||
Lwt_utils.worker "reader"
|
||||
~run:(fun () -> worker_loop st [])
|
||||
~run:(fun () -> worker_loop st None)
|
||||
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
|
||||
st
|
||||
|
||||
|
81
src/lib_stdlib/binary_stream.ml
Normal file
81
src/lib_stdlib/binary_stream.ml
Normal file
@ -0,0 +1,81 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(* Facilities to decode streams of binary data *)
|
||||
|
||||
type buffer = {
|
||||
buffer : MBytes.t ;
|
||||
ofs : int ;
|
||||
len : int ;
|
||||
}
|
||||
|
||||
type t = {
|
||||
current : buffer ;
|
||||
(* buffer queue (classical double list implementation) *)
|
||||
pending : MBytes.t list ;
|
||||
pending_rev : MBytes.t list ;
|
||||
(* number unread bytes in 'current + pending + pending_rev' *)
|
||||
unread : int ;
|
||||
}
|
||||
|
||||
let is_empty { unread ; _ } = unread = 0
|
||||
|
||||
let of_buffer current =
|
||||
{ current ;
|
||||
pending = [] ;
|
||||
pending_rev = [] ;
|
||||
unread = current.len }
|
||||
|
||||
let of_bytes buffer =
|
||||
let len = MBytes.length buffer in
|
||||
of_buffer { buffer ; ofs = 0 ; len }
|
||||
|
||||
let empty = of_bytes (MBytes.create 0)
|
||||
|
||||
let push buffer stream =
|
||||
{ stream with pending_rev = buffer :: stream.pending_rev ;
|
||||
unread = stream.unread + MBytes.length buffer }
|
||||
|
||||
exception Need_more_data
|
||||
|
||||
let split buffer len =
|
||||
assert (len <= buffer.len) ;
|
||||
{ buffer with len },
|
||||
{ buffer with ofs = buffer.ofs + len ; len = buffer.len - len }
|
||||
|
||||
let read stream len =
|
||||
if len > stream.unread then raise Need_more_data ;
|
||||
if len <= stream.current.len then
|
||||
let res, current = split stream.current len in
|
||||
res, { stream with current ; unread = stream.unread - len }
|
||||
else
|
||||
let res = { buffer = MBytes.create len ; ofs = 0 ; len } in
|
||||
MBytes.blit
|
||||
stream.current.buffer stream.current.ofs
|
||||
res.buffer 0
|
||||
stream.current.len ;
|
||||
let rec loop ofs pending_rev = function
|
||||
| [] -> loop ofs [] (List.rev pending_rev)
|
||||
| buffer :: pending ->
|
||||
let current = { buffer ; ofs = 0 ; len = MBytes.length buffer } in
|
||||
let to_read = len - ofs in
|
||||
if to_read <= current.len then begin
|
||||
MBytes.blit current.buffer 0 res.buffer ofs to_read ;
|
||||
res,
|
||||
{ current = { current with ofs = to_read ;
|
||||
len = current.len - to_read } ;
|
||||
pending ;
|
||||
pending_rev ;
|
||||
unread = stream.unread - len ;
|
||||
}
|
||||
end else begin
|
||||
MBytes.blit current.buffer 0 res.buffer ofs current.len ;
|
||||
loop (ofs + current.len) pending_rev pending
|
||||
end in
|
||||
loop stream.current.len stream.pending_rev stream.pending
|
24
src/lib_stdlib/binary_stream.mli
Normal file
24
src/lib_stdlib/binary_stream.mli
Normal file
@ -0,0 +1,24 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
type t
|
||||
|
||||
type buffer = {
|
||||
buffer : MBytes.t ;
|
||||
ofs : int ;
|
||||
len : int ;
|
||||
}
|
||||
|
||||
exception Need_more_data
|
||||
|
||||
val is_empty: t -> bool
|
||||
val empty: t
|
||||
val of_buffer: buffer -> t
|
||||
val read: t -> int -> buffer * t
|
||||
val push: MBytes.t -> t -> t
|
Loading…
Reference in New Issue
Block a user