Shell/baker: inline full operation contents in RPC.

This commit is contained in:
Grégoire Henry 2017-11-13 23:27:19 +01:00 committed by Benjamin Canou
parent 1163c19213
commit 3e39f82bee
22 changed files with 192 additions and 261 deletions

View File

@ -18,12 +18,6 @@ let errors cctxt =
let forge_block_header cctxt header =
call_service0 cctxt Services.forge_block_header header
type operation = Node_rpc_services.operation =
| Blob of Operation.t
| Hash of Operation_hash.t
let operation_encoding = Node_rpc_services.operation_encoding
let inject_block cctxt ?(async = false) ?(force = false) raw operations =
call_err_service0 cctxt Services.inject_block
{ raw ; blocking = not async ; force ; operations }
@ -64,14 +58,14 @@ module Blocks = struct
operations_hash: Operation_list_list_hash.t ;
fitness: MBytes.t list ;
data: MBytes.t ;
operations: Operation_hash.t list list option ;
operations: (Operation_hash.t * Operation.t) list list option ;
protocol: Protocol_hash.t ;
test_network: Context.test_network;
}
type preapply_param = Services.Blocks.preapply_param = {
timestamp: Time.t ;
proto_header: MBytes.t ;
operations: operation list ;
operations: Operation.t list ;
sort_operations: bool ;
}
type preapply_result = Services.Blocks.preapply_result = {

View File

@ -17,16 +17,10 @@ val forge_block_header:
Block_header.t ->
MBytes.t tzresult Lwt.t
type operation =
| Blob of Operation.t
| Hash of Operation_hash.t
val operation_encoding: operation Data_encoding.t
val inject_block:
config ->
?async:bool -> ?force:bool ->
MBytes.t -> operation list list ->
MBytes.t -> Operation.t list list ->
Block_hash.t tzresult Lwt.t
(** [inject_block cctxt ?async ?force raw_block] tries to inject
[raw_block] inside the node. If [?async] is [true], [raw_block]
@ -85,7 +79,7 @@ module Blocks : sig
val pending_operations:
config ->
block ->
(error Prevalidation.preapply_result * Operation_hash.Set.t) tzresult Lwt.t
(error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t) tzresult Lwt.t
type block_info = {
hash: Block_hash.t ;
@ -98,7 +92,7 @@ module Blocks : sig
operations_hash: Operation_list_list_hash.t ;
fitness: MBytes.t list ;
data: MBytes.t ;
operations: Operation_hash.t list list option ;
operations: (Operation_hash.t * Operation.t) list list option ;
protocol: Protocol_hash.t ;
test_network: Context.test_network;
}
@ -130,7 +124,7 @@ module Blocks : sig
?timestamp:Time.t ->
?sort:bool ->
proto_header:MBytes.t ->
operation list -> preapply_result tzresult Lwt.t
Operation.t list -> preapply_result tzresult Lwt.t
end

View File

@ -52,10 +52,7 @@ let assert_valid_operations_hash shell_header operations =
Operation_list_list_hash.compute
(List.map Operation_list_hash.compute
(List.map
(List.map
(function
| Client_node_rpcs.Blob op -> Tezos_data.Operation.hash op
| Hash oph -> oph)) operations)) in
(List.map Tezos_data.Operation.hash) operations)) in
fail_unless
(Operation_list_list_hash.equal
operations_hash shell_header.Tezos_data.Block_header.operations_hash)
@ -74,7 +71,7 @@ let inject_block cctxt
return block_hash
type error +=
| Failed_to_preapply of Client_node_rpcs.operation * error list
| Failed_to_preapply of Tezos_data.Operation.t * error list
let () =
register_error_kind
@ -83,16 +80,13 @@ let () =
~title: "Fail to preapply an operation"
~description: ""
~pp:(fun ppf (op, err) ->
let h =
match op with
| Client_node_rpcs.Hash h -> h
| Blob op -> Tezos_data.Operation.hash op in
let h = Tezos_data.Operation.hash op in
Format.fprintf ppf "@[Failed to preapply %a:@ %a@]"
Operation_hash.pp_short h
pp_print_error err)
Data_encoding.
(obj2
(req "operation" (dynamic_size Client_node_rpcs.operation_encoding))
(req "operation" (dynamic_size Tezos_data.Operation.encoding))
(req "error" Node_rpc_services.Error.encoding))
(function
| Failed_to_preapply (hash, err) -> Some (hash, err)
@ -112,11 +106,13 @@ let forge_block cctxt block
Client_node_rpcs.Blocks.pending_operations
cctxt block >>=? fun (ops, pendings) ->
let ops =
Operation_hash.Set.elements @@
Operation_hash.Set.union
List.map snd @@
Operation_hash.Map.bindings @@
Operation_hash.Map.fold
Operation_hash.Map.add
(Prevalidation.preapply_result_operations ops)
pendings in
return (List.map (fun x -> Client_node_rpcs.Hash x) ops)
return ops
| Some operations -> return operations
end >>=? fun operations ->
begin
@ -177,20 +173,7 @@ let forge_block cctxt block
&& Operation_hash.Map.is_empty result.branch_delayed ) then
let operations =
if not best_effort then operations
else
let map =
List.fold_left
(fun map op ->
match op with
| Client_node_rpcs.Hash _ -> map
| Blob op ->
Operation_hash.Map.add (Tezos_data.Operation.hash op) op map)
Operation_hash.Map.empty operations in
List.map
(fun h ->
try Client_node_rpcs.Blob (Operation_hash.Map.find h map)
with _ -> Client_node_rpcs.Hash h)
result.applied in
else List.map snd result.applied in
inject_block cctxt
?force ~shell_header ~priority ~seed_nonce_hash ~src_sk
[operations]
@ -198,18 +181,15 @@ let forge_block cctxt block
Lwt.return_error @@
Utils.filter_map
(fun op ->
let h =
match op with
| Client_node_rpcs.Hash h -> h
| Blob op -> Tezos_data.Operation.hash op in
let h = Tezos_data.Operation.hash op in
try Some (Failed_to_preapply
(op, Operation_hash.Map.find h result.refused))
(op, snd @@ Operation_hash.Map.find h result.refused))
with Not_found ->
try Some (Failed_to_preapply
(op, Operation_hash.Map.find h result.branch_refused))
(op, snd @@ Operation_hash.Map.find h result.branch_refused))
with Not_found ->
try Some (Failed_to_preapply
(op, Operation_hash.Map.find h result.branch_delayed))
(op, snd @@ Operation_hash.Map.find h result.branch_delayed))
with Not_found -> None)
operations
@ -481,9 +461,10 @@ let mine cctxt state =
Client_node_rpcs.Blocks.pending_operations cctxt.rpc_config
block >>=? fun (res, ops) ->
let operations =
let open Operation_hash.Set in
List.map (fun x -> Client_node_rpcs.Hash x) @@
elements (union ops (Prevalidation.preapply_result_operations res)) in
List.map snd @@
Operation_hash.Map.bindings @@
Operation_hash.Map.(fold add)
ops (Prevalidation.preapply_result_operations res) in
let request = List.length operations in
let proto_header =
forge_faked_proto_header ~priority ~seed_nonce_hash in
@ -527,7 +508,7 @@ let mine cctxt state =
Client_keys.get_key cctxt delegate >>=? fun (_,_,src_sk) ->
inject_block cctxt.rpc_config
~force:true ~shell_header ~priority ~seed_nonce_hash ~src_sk
[List.map (fun h -> Client_node_rpcs.Hash h) operations.applied]
[List.map snd operations.applied]
|> trace_exn (Failure "Error while injecting block") >>=? fun block_hash ->
State.record_block cctxt level block_hash seed_nonce
|> trace_exn (Failure "Error while recording block") >>=? fun () ->

View File

@ -20,7 +20,7 @@ val inject_block:
priority:int ->
seed_nonce_hash:Nonce_hash.t ->
src_sk:secret_key ->
Client_node_rpcs.operation list list ->
Tezos_data.Operation.t list list ->
Block_hash.t tzresult Lwt.t
(** [inject_block cctxt blk ?force ~priority ~timestamp ~fitness
~seed_nonce ~src_sk ops] tries to inject a block in the node. If
@ -29,13 +29,13 @@ val inject_block:
precomputed). [src_sk] is used to sign the block header. *)
type error +=
| Failed_to_preapply of Client_node_rpcs.operation * error list
| Failed_to_preapply of Tezos_data.Operation.t * error list
val forge_block:
Client_rpcs.config ->
Client_proto_rpcs.block ->
?force:bool ->
?operations:Client_node_rpcs.operation list ->
?operations:Tezos_data.Operation.t list ->
?best_effort:bool ->
?sort:bool ->
?timestamp:Time.t ->

View File

@ -764,20 +764,6 @@ let commit_protocol db h p =
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
return (res <> None)
type operation =
| Blob of Operation.t
| Hash of Operation_hash.t
let resolve_operation net_db = function
| Blob op ->
fail_unless
(Net_id.equal op.shell.net_id (State.Net.id net_db.net_state))
(failure "Inconsistent net_id in operation.") >>=? fun () ->
return op
| Hash oph ->
Raw_operation.Table.read net_db.operation_db.table oph >>=? fun op ->
return op
let watch_block_header { block_input } =
Watcher.create_stream block_input
let watch_operation { operation_input } =

View File

@ -38,13 +38,6 @@ val deactivate: net_db -> unit Lwt.t
val disconnect: net_db -> P2p.Peer_id.t -> unit Lwt.t
type operation =
| Blob of Operation.t
| Hash of Operation_hash.t
val resolve_operation:
net_db -> operation -> Operation.t tzresult Lwt.t
val commit_block:
net_db ->
Block_hash.t ->

View File

@ -54,7 +54,7 @@ type t = {
mainnet_validator: Net_validator.t ;
inject_block:
?force:bool ->
MBytes.t -> Distributed_db.operation list list ->
MBytes.t -> Operation.t list list ->
(Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t ;
inject_operation:
?force:bool -> MBytes.t ->
@ -151,7 +151,7 @@ module RPC = struct
operations_hash: Operation_list_list_hash.t ;
fitness: MBytes.t list ;
data: MBytes.t ;
operations: Operation_hash.t list list option ;
operations: (Operation_hash.t * Operation.t) list list option ;
protocol: Protocol_hash.t ;
test_network: Context.test_network;
}
@ -159,7 +159,9 @@ module RPC = struct
let convert (block: State.Block.t) =
let hash = State.Block.hash block in
let header = State.Block.header block in
State.Block.all_operation_hashes block >>= fun operations ->
State.Block.all_operations block >>= fun operations ->
let operations =
List.map (List.map (fun op -> (Operation.hash op, op))) operations in
State.Block.context block >>= fun context ->
Context.get_protocol context >>= fun protocol ->
Context.get_test_network context >>= fun test_network ->
@ -279,7 +281,9 @@ module RPC = struct
validation_passes = List.length operations ;
operations_hash =
Operation_list_list_hash.compute
(List.map Operation_list_hash.compute operations) ;
(List.map
(fun ops -> Operation_list_hash.compute (List.map fst ops))
operations) ;
operations = Some operations ;
data = MBytes.of_string "" ;
net_id = head_header.shell.net_id ;
@ -323,7 +327,6 @@ module RPC = struct
| ( `Prevalidation | `Test_prevalidation ) as block ->
let validator = get_validator node block in
let pv = Net_validator.prevalidator validator in
let net_db = Net_validator.net_db validator in
let net_state = Net_validator.net_state validator in
Chain.head net_state >>= fun head ->
let head_header = State.Block.header head in
@ -339,9 +342,10 @@ module RPC = struct
head_header.shell.proto_level
else
((head_header.shell.proto_level + 1) mod 256) in
let operation_hashes =
let operation_hashes, operations =
let pv_result, _ = Prevalidator.operations pv in
[ pv_result.applied ] in
[ List.map fst pv_result.applied ],
[ List.map snd pv_result.applied ] in
let operations_hash =
Operation_list_list_hash.compute
(List.map Operation_list_hash.compute operation_hashes) in
@ -361,12 +365,7 @@ module RPC = struct
proto = MBytes.create 0 ;
} ;
operation_hashes = (fun () -> Lwt.return operation_hashes) ;
operations = begin fun () ->
Lwt_list.map_p
(Lwt_list.map_p
(Distributed_db.Operation.read_exn net_db))
operation_hashes
end ;
operations = (fun () -> Lwt.return operations) ;
context ;
})
@ -384,7 +383,7 @@ module RPC = struct
let validator = get_validator node block in
let pv = Net_validator.prevalidator validator in
let { Prevalidation.applied }, _ = Prevalidator.operations pv in
Lwt.return [applied]
Lwt.return [List.map fst applied]
| `Hash hash ->
read_valid_block node hash >>= function
| None -> Lwt.return_nil
@ -403,12 +402,9 @@ module RPC = struct
State.Block.all_operations block
| (`Prevalidation | `Test_prevalidation) as block ->
let validator = get_validator node block in
let net_db = Net_validator.net_db validator in
let pv = Net_validator.prevalidator validator in
let { Prevalidation.applied }, _ = Prevalidator.operations pv in
Lwt_list.map_p
(Distributed_db.Operation.read_exn net_db) applied >>= fun applied ->
Lwt.return [applied]
Lwt.return [List.map snd applied]
| `Hash hash ->
read_valid_block node hash >>= function
| None -> Lwt.return_nil
@ -441,7 +437,7 @@ module RPC = struct
| `Hash h -> begin
get_validator_per_hash node h >>= function
| None ->
Lwt.return (Prevalidation.empty_result, Operation_hash.Set.empty)
Lwt.return (Prevalidation.empty_result, Operation_hash.Map.empty)
| Some validator ->
let net_state = Net_validator.net_state validator in
let prevalidator = Net_validator.prevalidator validator in
@ -482,16 +478,14 @@ module RPC = struct
| None -> Lwt.return (error_exn Not_found)
| Some data -> return data
end >>=? fun predecessor ->
let net_db = Net_validator.net_db node.mainnet_validator in
map_p (Distributed_db.resolve_operation net_db) ops >>=? fun rops ->
Prevalidation.start_prevalidation
~proto_header ~predecessor ~timestamp () >>=? fun validation_state ->
let rops = List.map (fun x -> Operation.hash x, x) rops in
let ops = List.map (fun x -> Operation.hash x, x) ops in
Prevalidation.prevalidate
validation_state ~sort rops >>= fun (validation_state, r) ->
validation_state ~sort ops >>= fun (validation_state, r) ->
let operations_hash =
Operation_list_list_hash.compute
[Operation_list_hash.compute r.applied] in
[Operation_list_hash.compute (List.map fst r.applied)] in
Prevalidation.end_prevalidation
validation_state >>=? fun { fitness ; context } ->
let pred_shell_header = State.Block.shell_header predecessor in

View File

@ -36,7 +36,7 @@ module RPC : sig
val inject_block:
t -> ?force:bool ->
MBytes.t -> Distributed_db.operation list list ->
MBytes.t -> Operation.t list list ->
(Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t
(** [inject_block node ?force bytes] tries to insert [bytes]
(supposedly the serialization of a block header) inside
@ -75,7 +75,7 @@ module RPC : sig
t -> (Operation_hash.t * Operation.t) Lwt_stream.t * Watcher.stopper
val pending_operations:
t -> block -> (error Prevalidation.preapply_result * Operation_hash.Set.t) Lwt.t
t -> block -> (error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t) Lwt.t
val protocols:
t -> Protocol_hash.t list Lwt.t
@ -90,7 +90,7 @@ module RPC : sig
val preapply:
t -> block ->
timestamp:Time.t -> proto_header:MBytes.t ->
sort_operations:bool -> Distributed_db.operation list ->
sort_operations:bool -> Operation.t list ->
(Block_header.shell_header * error Prevalidation.preapply_result) tzresult Lwt.t
val context_dir:

View File

@ -46,21 +46,6 @@ module Error = struct
end
type operation = Distributed_db.operation =
| Blob of Operation.t
| Hash of Operation_hash.t
let operation_encoding =
let open Data_encoding in
union [
case Operation.encoding
(function Blob op -> Some op | Hash _ -> None)
(fun op -> Blob op) ;
case Operation_hash.encoding
(function Hash oph -> Some oph | Blob _ -> None)
(fun oph -> Hash oph) ;
]
module Blocks = struct
type block = [
@ -81,12 +66,16 @@ module Blocks = struct
operations_hash: Operation_list_list_hash.t ;
fitness: MBytes.t list ;
data: MBytes.t ;
operations: Operation_hash.t list list option ;
operations: (Operation_hash.t * Operation.t) list list option ;
protocol: Protocol_hash.t ;
test_network: Context.test_network;
}
let block_info_encoding =
let operation_encoding =
merge_objs
(obj1 (req "hash" Operation_hash.encoding))
Operation.encoding in
conv
(fun { hash ; net_id ; level ; proto_level ; predecessor ;
fitness ; timestamp ; protocol ;
@ -110,7 +99,7 @@ module Blocks = struct
(merge_objs
(obj4
(req "hash" Block_hash.encoding)
(opt "operations" (list (list Operation_hash.encoding)))
(opt "operations" (dynamic_size (list (dynamic_size (list (dynamic_size operation_encoding))))))
(req "protocol" Protocol_hash.encoding)
(dft "test_network"
Context.test_network_encoding Context.Not_running))
@ -256,6 +245,10 @@ module Blocks = struct
RPC.Path.(block_path / "test_network")
let pending_operations =
let operation_encoding =
merge_objs
(obj1 (req "hash" Operation_hash.encoding))
Operation.encoding in
(* TODO: branch_delayed/... *)
RPC.service
~description:
@ -263,32 +256,18 @@ module Blocks = struct
~input: empty
~output:
(conv
(fun ({ Prevalidation.applied; branch_delayed ; branch_refused },
unprocessed) ->
(applied,
Operation_hash.Map.bindings branch_delayed,
Operation_hash.Map.bindings branch_refused,
Operation_hash.Set.elements unprocessed))
(fun (applied, branch_delayed, branch_refused, unprocessed) ->
({ Prevalidation.applied ; refused = Operation_hash.Map.empty ;
branch_refused =
List.fold_right
(fun (k, o) -> Operation_hash.Map.add k o)
branch_refused Operation_hash.Map.empty ;
branch_delayed =
List.fold_right
(fun (k, o) -> Operation_hash.Map.add k o)
branch_delayed Operation_hash.Map.empty ;
},
List.fold_right Operation_hash.Set.add
unprocessed Operation_hash.Set.empty))
(obj4
(req "applied" (list Operation_hash.encoding))
(req "branch_delayed"
(list (tup2 Operation_hash.encoding Error.encoding)))
(req "branch_refused"
(list (tup2 Operation_hash.encoding Error.encoding)))
(req "unprocessed" (list Operation_hash.encoding))))
(fun (preapplied, unprocessed) ->
({ preapplied with Prevalidation.refused = Operation_hash.Map.empty },
Operation_hash.Map.bindings unprocessed))
(fun (preapplied, unprocessed) ->
(preapplied,
List.fold_right
(fun (h, op) m -> Operation_hash.Map.add h op m)
unprocessed Operation_hash.Map.empty))
(merge_objs
(dynamic_size
(Prevalidation.preapply_result_encoding Error.encoding))
(obj1 (req "unprocessed" (list (dynamic_size operation_encoding))))))
RPC.Path.(block_path / "pending_operations")
let proto_path =
@ -297,7 +276,7 @@ module Blocks = struct
type preapply_param = {
timestamp: Time.t ;
proto_header: MBytes.t ;
operations: operation list ;
operations: Operation.t list ;
sort_operations: bool ;
}
@ -310,7 +289,7 @@ module Blocks = struct
(obj4
(req "timestamp" Time.encoding)
(req "proto_header" bytes)
(req "operations" (list (dynamic_size operation_encoding)))
(req "operations" (list (dynamic_size Operation.encoding)))
(dft "sort_operations" bool false)))
type preapply_result = {
@ -623,7 +602,7 @@ type inject_block_param = {
raw: MBytes.t ;
blocking: bool ;
force: bool ;
operations: operation list list ;
operations: Operation.t list list ;
}
let inject_block_param =
@ -651,7 +630,7 @@ let inject_block_param =
(req "operations"
(describe
~description:"..."
(list (list (dynamic_size operation_encoding))))))
(list (list (dynamic_size Operation.encoding))))))
let inject_block =
RPC.service

View File

@ -13,12 +13,6 @@ module Error : sig
val wrap: 'a Data_encoding.t -> 'a tzresult Data_encoding.encoding
end
type operation = Distributed_db.operation =
| Blob of Operation.t
| Hash of Operation_hash.t
val operation_encoding: operation Data_encoding.t
module Blocks : sig
type block = [
@ -43,7 +37,7 @@ module Blocks : sig
operations_hash: Operation_list_list_hash.t ;
fitness: MBytes.t list ;
data: MBytes.t ;
operations: Operation_hash.t list list option ;
operations: (Operation_hash.t * Operation.t) list list option ;
protocol: Protocol_hash.t ;
test_network: Context.test_network;
}
@ -79,7 +73,7 @@ module Blocks : sig
(unit, unit * block, unit, Context.test_network) RPC.service
val pending_operations:
(unit, unit * block, unit,
error Prevalidation.preapply_result * Hash.Operation_hash.Set.t) RPC.service
error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t) RPC.service
type list_param = {
include_ops: bool ;
@ -96,7 +90,7 @@ module Blocks : sig
type preapply_param = {
timestamp: Time.t ;
proto_header: MBytes.t ;
operations: operation list ;
operations: Operation.t list ;
sort_operations: bool ;
}
@ -183,7 +177,7 @@ type inject_block_param = {
raw: MBytes.t ;
blocking: bool ;
force: bool ;
operations: operation list list ;
operations: Operation.t list list ;
}
val inject_block:

View File

@ -32,8 +32,8 @@ type t = {
notify_bootstrapped: unit -> unit ;
mutable bootstrapped: bool ;
mutable last_validated_head: Block_hash.t ;
mutable last_advertised_head: Block_hash.t ;
mutable last_validated_head: Block_header.t ;
mutable last_advertised_head: Block_header.t ;
mutable worker: unit Lwt.t ;
dropbox: msg Lwt_dropbox.t ;
@ -130,7 +130,7 @@ let may_validate_new_head pv hash header =
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
set_bootstrapped pv ;
pv.last_validated_head <- hash ;
pv.last_validated_head <- header ;
return ()
| false ->
lwt_log_info
@ -246,9 +246,10 @@ let create
let canceler = Canceler.create () in
let dropbox = Lwt_dropbox.create () in
let net_state = Distributed_db.net_state net_db in
let genesis = (State.Net.genesis net_state).block in
State.Block.read_exn net_state
(State.Net.genesis net_state).block >>= fun genesis ->
let rec notify_new_block block =
pv.last_validated_head <- State.Block.hash block ;
pv.last_validated_head <- State.Block.header block ;
external_notify_new_block block
and pv = {
block_validator ;
@ -261,8 +262,8 @@ let create
net_db ;
peer_id ;
bootstrapped = false ;
last_validated_head = genesis ;
last_advertised_head = genesis ;
last_validated_head = State.Block.header genesis ;
last_advertised_head = State.Block.header genesis ;
canceler ;
dropbox ;
worker = Lwt.return_unit ;
@ -282,15 +283,17 @@ let create
Lwt.return pv
let notify_branch pv locator =
let head, _ = (locator : Block_locator.t :> _ * _) in
let hash = Block_header.hash head in
pv.last_advertised_head <- hash ;
let header, _ = (locator : Block_locator.t :> _ * _) in
let hash = Block_header.hash header in
(* TODO penalize decreasing fitness *)
pv.last_advertised_head <- header ;
try Lwt_dropbox.put pv.dropbox (New_branch (hash, locator))
with Lwt_dropbox.Closed -> ()
let notify_head pv header =
let hash = Block_header.hash header in
pv.last_advertised_head <- hash ;
pv.last_advertised_head <- header ;
(* TODO penalize decreasing fitness *)
match Lwt_dropbox.peek pv.dropbox with
| Some (New_branch _) -> () (* ignore *)
| None | Some (New_head _) ->

View File

@ -11,7 +11,7 @@ type t
val peer_id: t -> P2p.Peer_id.t
val bootstrapped: t -> bool
val current_head: t -> Block_hash.t
val current_head: t -> Block_header.t
val create:
?notify_new_block: (State.Block.t -> unit) ->

View File

@ -9,10 +9,10 @@
type 'error preapply_result = {
applied: Operation_hash.t list;
refused: 'error list Operation_hash.Map.t;
branch_refused: 'error list Operation_hash.Map.t;
branch_delayed: 'error list Operation_hash.Map.t;
applied: (Operation_hash.t * Operation.t) list;
refused: (Operation.t * 'error list) Operation_hash.Map.t;
branch_refused: (Operation.t * 'error list) Operation_hash.Map.t;
branch_delayed: (Operation.t * 'error list) Operation_hash.Map.t;
}
let empty_result = {
@ -31,7 +31,16 @@ let map_result f r = {
let preapply_result_encoding error_encoding =
let open Data_encoding in
let refused_encoding = tup2 Operation_hash.encoding error_encoding in
let operation_encoding =
merge_objs
(obj1 (req "hash" Operation_hash.encoding))
(dynamic_size Operation.encoding) in
let refused_encoding =
merge_objs
(obj1 (req "hash" Operation_hash.encoding))
(merge_objs
(dynamic_size Operation.encoding)
(obj1 (req "error" error_encoding))) in
let build_list map = Operation_hash.Map.bindings map in
let build_map list =
List.fold_right
@ -47,7 +56,7 @@ let preapply_result_encoding error_encoding =
let branch_delayed = build_map branch_delayed in
{ applied ; refused ; branch_refused ; branch_delayed })
(obj4
(req "applied" (list Operation_hash.encoding))
(req "applied" (list operation_encoding))
(req "refused" (list refused_encoding))
(req "branch_refused" (list refused_encoding))
(req "branch_delayed" (list refused_encoding)))
@ -55,15 +64,15 @@ let preapply_result_encoding error_encoding =
let preapply_result_operations t =
let ops =
List.fold_left
(fun acc x -> Operation_hash.Set.add x acc)
Operation_hash.Set.empty t.applied in
(fun acc (h, op) -> Operation_hash.Map.add h op acc)
Operation_hash.Map.empty t.applied in
let ops =
Operation_hash.Map.fold
(fun x _ acc -> Operation_hash.Set.add x acc)
(fun h (op, _err) acc -> Operation_hash.Map.add h op acc)
t.branch_delayed ops in
let ops =
Operation_hash.Map.fold
(fun x _ acc -> Operation_hash.Set.add x acc)
(fun h (op, _err) acc -> Operation_hash.Map.add h op acc)
t.branch_refused ops in
ops
@ -75,24 +84,24 @@ let empty_result =
let rec apply_operations apply_operation state r ~sort ops =
Lwt_list.fold_left_s
(fun (state, r) (hash, op) ->
apply_operation state op >>= function
(fun (state, r) (hash, op, parsed_op) ->
apply_operation state parsed_op >>= function
| Ok state ->
let applied = hash :: r.applied in
Lwt.return (state, { r with applied} )
let applied = (hash, op) :: r.applied in
Lwt.return (state, { r with applied } )
| Error errors ->
match classify_errors errors with
| `Branch ->
let branch_refused =
Operation_hash.Map.add hash errors r.branch_refused in
Operation_hash.Map.add hash (op, errors) r.branch_refused in
Lwt.return (state, { r with branch_refused })
| `Permanent ->
let refused =
Operation_hash.Map.add hash errors r.refused in
Operation_hash.Map.add hash (op, errors) r.refused in
Lwt.return (state, { r with refused })
| `Temporary ->
let branch_delayed =
Operation_hash.Map.add hash errors r.branch_delayed in
Operation_hash.Map.add hash (op, errors) r.branch_delayed in
Lwt.return (state, { r with branch_delayed }))
(state, r)
ops >>= fun (state, r) ->
@ -100,7 +109,7 @@ let rec apply_operations apply_operation state r ~sort ops =
| _ :: _ when sort ->
let rechecked_operations =
List.filter
(fun (hash, _) -> Operation_hash.Map.mem hash r.branch_delayed)
(fun (hash, _, _) -> Operation_hash.Map.mem hash r.branch_delayed)
ops in
let remaining = List.length rechecked_operations in
if remaining = 0 || remaining = List.length ops then
@ -155,25 +164,25 @@ type error += Parse_error
let prevalidate
(State { proto = (module Proto) ; state })
~sort ops =
~sort (ops : (Operation_hash.t * Operation.t) list)=
let ops =
List.map
(fun (h, op) ->
(h, Proto.parse_operation h op |> record_trace Parse_error))
(h, op, Proto.parse_operation h op |> record_trace Parse_error))
ops in
let invalid_ops =
Utils.filter_map
(fun (h, op) -> match op with
(fun (h, op, parsed_op) -> match parsed_op with
| Ok _ -> None
| Error err -> Some (h, err)) ops
| Error err -> Some (h, op, err)) ops
and parsed_ops =
Utils.filter_map
(fun (h, op) -> match op with
| Ok op -> Some (h, op)
(fun (h, op, parsed_op) -> match parsed_op with
| Ok parsed_op -> Some (h, op, parsed_op)
| Error _ -> None) ops in
let sorted_ops =
if sort then
let compare (_, op1) (_, op2) = Proto.compare_operations op1 op2 in
let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in
List.sort compare parsed_ops
else parsed_ops in
apply_operations
@ -184,7 +193,7 @@ let prevalidate
applied = List.rev r.applied ;
branch_refused =
List.fold_left
(fun map (h, err) -> Operation_hash.Map.add h err map)
(fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map)
r.branch_refused invalid_ops } in
Lwt.return (State { proto = (module Proto) ; state }, r)

View File

@ -8,19 +8,19 @@
(**************************************************************************)
type 'error preapply_result = {
applied: Operation_hash.t list;
refused: 'error list Operation_hash.Map.t;
applied: (Operation_hash.t * Operation.t) list;
refused: (Operation.t * 'error list) Operation_hash.Map.t;
(* e.g. invalid signature *)
branch_refused: 'error list Operation_hash.Map.t;
branch_refused: (Operation.t * 'error list) Operation_hash.Map.t;
(* e.g. insufficent balance *)
branch_delayed: 'error list Operation_hash.Map.t;
branch_delayed: (Operation.t * 'error list) Operation_hash.Map.t;
(* e.g. timestamp in the future *)
}
val empty_result : 'error preapply_result
val preapply_result_operations :
'error preapply_result -> Operation_hash.Set.t
'error preapply_result -> Operation.t Operation_hash.Map.t
val preapply_result_encoding :
'error list Data_encoding.t ->

View File

@ -24,7 +24,7 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool =
Distributed_db.inject_operation net_db h op >>= fun _ ->
Lwt.return_unit
end >>= fun () ->
Lwt.return (Operation_hash.Set.add h mempool)))
Lwt.return (Operation_hash.Map.add h op mempool)))
mempool operations >>= fun mempool ->
State.Block.predecessor block >>= function
| None -> assert false
@ -35,7 +35,7 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool =
iter_option maintain_net_db
~f:(fun net_db -> Distributed_db.clear_operations net_db operations) ;
List.fold_left
(List.fold_left (fun mempool h -> Operation_hash.Set.remove h mempool))
(List.fold_left (fun mempool h -> Operation_hash.Map.remove h mempool))
mempool operations
in
Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, path) ->
@ -57,8 +57,8 @@ type t = {
prevalidate_operations:
bool -> Operation.t list ->
(Operation_hash.t list * error preapply_result) tzresult Lwt.t ;
operations: unit -> error preapply_result * Operation_hash.Set.t ;
pending: ?block:State.Block.t -> unit -> Operation_hash.Set.t Lwt.t ;
operations: unit -> error preapply_result * Operation.t Operation_hash.Map.t ;
pending: ?block:State.Block.t -> unit -> Operation.t Operation_hash.Map.t Lwt.t ;
timestamp: unit -> Time.t ;
context: unit -> Updater.validation_result tzresult Lwt.t ;
shutdown: unit -> unit Lwt.t ;
@ -84,6 +84,7 @@ let create
(start_prevalidation ~predecessor:head ~timestamp:!timestamp () >|= ref) >>= fun validation_state ->
let pending = Operation_hash.Table.create 53 in
let head = ref head in
let mempool = ref [] in
let operations = ref empty_result in
Chain_traversal.live_blocks
!head
@ -92,7 +93,7 @@ let create
let live_blocks = ref live_blocks in
let live_operations = ref live_operations in
let running_validation = ref Lwt.return_unit in
let unprocessed = ref Operation_hash.Set.empty in
let unprocessed = ref Operation_hash.Map.empty in
let broadcast_unprocessed = ref false in
let set_validation_state state =
@ -108,24 +109,30 @@ let create
Distributed_db.Advertise.current_head net_db ~mempool:ops !head in
let handle_unprocessed () =
if Operation_hash.Set.is_empty !unprocessed then
if Operation_hash.Map.is_empty !unprocessed then
Lwt.return ()
else
let ops = !unprocessed in
let broadcast = !broadcast_unprocessed in
unprocessed := Operation_hash.Set.empty ;
unprocessed := Operation_hash.Map.empty ;
broadcast_unprocessed := false ;
let ops = Operation_hash.Set.diff ops !live_operations in
live_operations := Operation_hash.Set.(fold add) !live_operations ops ;
let ops =
Operation_hash.Set.fold
(fun k m -> Operation_hash.Map.remove k m)
!live_operations ops in
live_operations :=
Operation_hash.Map.fold
(fun k _ m -> Operation_hash.Set.add k m)
ops !live_operations ;
running_validation := begin
begin
Lwt_list.filter_map_p
(fun h ->
Distributed_db.Operation.read_opt net_db h >>= function
| Some po when Block_hash.Set.mem po.shell.branch !live_blocks ->
Lwt.return_some (h, po)
| Some _ | None -> Lwt.return_none)
(Operation_hash.Set.elements ops) >>= fun rops ->
(fun (h, op) ->
if Block_hash.Set.mem op.Operation.shell.branch !live_blocks then
Lwt.return_some (h, op)
else
Lwt.return_none)
(Operation_hash.Map.bindings ops) >>= fun rops ->
(Lwt.return !validation_state >>=? fun validation_state ->
(prevalidate validation_state ~sort:true rops >>= return)) >>= function
| Ok (state, r) -> Lwt.return (Ok state, r)
@ -133,13 +140,15 @@ let create
let r =
{ empty_result with
branch_delayed =
Operation_hash.Set.fold
(fun op m -> Operation_hash.Map.add op err m)
Operation_hash.Map.fold
(fun h op m -> Operation_hash.Map.add h (op, err) m)
ops Operation_hash.Map.empty ; } in
Lwt.return (!validation_state, r)
end >>= fun (state, r) ->
let filter_out s m =
List.fold_right Operation_hash.Map.remove s m in
List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in
let new_ops = List.map fst r.applied in
mempool := List.rev_append new_ops !mempool ;
operations := {
applied = List.rev_append r.applied !operations.applied ;
refused = Operation_hash.Map.empty ;
@ -153,8 +162,8 @@ let create
(filter_out r.applied !operations.branch_delayed)
r.branch_delayed ;
} ;
Chain.set_reversed_mempool net_state !operations.applied >>= fun () ->
if broadcast then broadcast_operation r.applied ;
Chain.set_reversed_mempool net_state !mempool >>= fun () ->
if broadcast then broadcast_operation new_ops ;
Lwt_list.iter_s
(fun (_op, _exns) ->
(* FIXME *)
@ -194,31 +203,31 @@ let create
Lwt.return !validation_state >>=? fun validation_state ->
prevalidate validation_state
~sort:true rops >>= fun (state, res) ->
let register h =
let op = Operation_hash.Map.find h ops in
live_operations := Operation_hash.Set.add h !live_operations ;
let register h op =
live_operations :=
Operation_hash.Set.add h !live_operations ;
Distributed_db.inject_operation
net_db h op >>=? fun (_ : bool) ->
return () in
iter_s
(fun h ->
register h >>=? fun () ->
(fun (h, op) ->
register h op >>=? fun () ->
mempool := h :: !mempool ;
operations :=
{ !operations with
applied = h :: !operations.applied };
applied = (h, op) :: !operations.applied };
return () )
res.applied >>=? fun () ->
Chain.set_reversed_mempool
net_state !operations.applied >>= fun () ->
broadcast_operation res.applied ;
Chain.set_reversed_mempool net_state !mempool >>= fun () ->
broadcast_operation (List.map fst res.applied) ;
begin
if force then
iter_p
(fun (h, _exns) -> register h)
(fun (h, (op, _exns)) -> register h op)
(Operation_hash.Map.bindings
res.branch_delayed) >>=? fun () ->
iter_p
(fun (h, _exns) -> register h)
(fun (h, (op, _exns)) -> register h op)
(Operation_hash.Map.bindings
res.branch_refused) >>=? fun () ->
operations :=
@ -252,8 +261,8 @@ let create
Distributed_db.Operation.fetch
~timeout:operation_timeout
net_db ~peer:gid h () >>= function
| Ok _op ->
push_to_worker (`Handle h) ;
| Ok op ->
push_to_worker (`Handle (h, op)) ;
Lwt.return_unit
| Error [ Distributed_db.Operation.Canceled _ ] ->
lwt_debug
@ -276,12 +285,11 @@ let create
net_db ~peer:gid op ()))
known_ops ;
Lwt.return_unit
| `Handle op ->
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
Operation_hash.Table.remove pending op ;
| `Handle (h, op) ->
Operation_hash.Table.remove pending h ;
broadcast_unprocessed := true ;
unprocessed := Operation_hash.Set.singleton op ;
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
unprocessed := Operation_hash.Map.singleton h op ;
lwt_debug "register %a" Operation_hash.pp_short h >>= fun () ->
Lwt.return_unit
| `Flush (new_head : State.Block.t) ->
list_pendings
@ -294,9 +302,10 @@ let create
>>= fun (new_live_blocks, new_live_operations) ->
lwt_debug "flush %a (mempool: %d)"
Block_hash.pp_short (State.Block.hash new_head)
(Operation_hash.Set.cardinal new_mempool) >>= fun () ->
(Operation_hash.Map.cardinal new_mempool) >>= fun () ->
(* Reset the pre-validation context *)
head := new_head ;
mempool := [] ;
operations := empty_result ;
broadcast_unprocessed := false ;
unprocessed := new_mempool ;
@ -376,7 +385,7 @@ let inject_operation pv ?(force = false) (op: Operation.t) =
let net_id = State.Net.id (Distributed_db.net_state pv.net_db) in
let wrap_error h map =
begin
try return (Operation_hash.Map.find h map)
try return (snd (Operation_hash.Map.find h map))
with Not_found ->
failwith "unexpected protocol result"
end >>=? fun errors ->
@ -385,7 +394,7 @@ let inject_operation pv ?(force = false) (op: Operation.t) =
(failure
"Prevalidator.inject_operation: invalid network") >>=? fun () ->
pv.prevalidate_operations force [op] >>=? function
| ([h], { applied = [h'] }) when Operation_hash.equal h h' ->
| ([h], { applied = [h', _] }) when Operation_hash.equal h h' ->
return ()
| ([h], { refused })
when Operation_hash.Map.cardinal refused = 1 ->

View File

@ -44,7 +44,7 @@ val inject_operation: t -> ?force:bool -> Operation.t -> unit tzresult Lwt.t
val flush: t -> State.Block.t -> unit
val timestamp: t -> Time.t
val operations: t -> error Prevalidation.preapply_result * Operation_hash.Set.t
val operations: t -> error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t
val context: t -> Updater.validation_result tzresult Lwt.t
val pending: ?block:State.Block.t -> t -> Operation_hash.Set.t Lwt.t
val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t

View File

@ -62,10 +62,7 @@ let inject_block v ?force bytes operations =
| None -> failwith "Cannot parse block header."
| Some block ->
get v block.shell.net_id >>=? fun nv ->
(* TODO... remove `Distributed_db.operation`
and only accept raw operations ??? *)
let validation =
map_p (map_p (Distributed_db.resolve_operation (Net_validator.net_db nv))) operations >>=? fun operations ->
Net_validator.validate_block nv ?force hash block operations in
return (hash, validation)

View File

@ -26,7 +26,7 @@ val get_exn: t -> Net_id.t -> Net_validator.t Lwt.t
val inject_block:
t ->
?force:bool ->
MBytes.t -> Distributed_db.operation list list ->
MBytes.t -> Operation.t list list ->
(Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t
val watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper

View File

@ -104,6 +104,7 @@ module Make() = struct
!error_kinds in
let json_encoding = Data_encoding.union cases in
let encoding =
Data_encoding.dynamic_size @@
Data_encoding.splitted
~json:json_encoding
~binary:

View File

@ -325,9 +325,7 @@ module Assert = struct
List.exists f errors
| _ -> false
let hash = function
| Client_node_rpcs.Hash h -> h
| Blob op -> Tezos_data.Operation.hash op
let hash op = Tezos_data.Operation.hash op
let failed_to_preapply ~msg ?op f =
Assert.contain_error ~msg ~f:begin function
@ -426,7 +424,6 @@ end
module Baking = struct
let mine block (contract: Account.t) operations =
let operations = List.map (fun op -> Client_node_rpcs.Blob op) operations in
let seed_nonce =
match Nonce.of_bytes @@
Sodium.Random.Bigbytes.generate Constants.nonce_length with

View File

@ -156,7 +156,7 @@ module Assert : sig
val failed_to_preapply:
msg:string ->
?op:Client_node_rpcs.operation ->
?op:Tezos_data.Operation.t ->
(Environment.Error_monad.error ->
bool) ->
'a tzresult -> unit

View File

@ -84,13 +84,13 @@ let test_wrong_delegate ~miner contract block =
let test_invalid_endorsement_slot contract block =
Helpers.Endorse.endorse ~slot:~-1 contract block >>=? fun op ->
Helpers.Baking.mine block contract [ op ] >>= fun res ->
Assert.failed_to_preapply ~msg:__LOC__ ~op:(Blob op) begin function
Assert.failed_to_preapply ~msg:__LOC__ ~op begin function
| Baking.Invalid_endorsement_slot _ -> true
| _ -> false
end res ;
Helpers.Endorse.endorse ~slot:16 contract block >>=? fun op ->
Helpers.Baking.mine block contract [ op ] >>= fun res ->
Assert.failed_to_preapply ~msg:__LOC__ ~op:(Blob op) begin function
Assert.failed_to_preapply ~msg:__LOC__ ~op begin function
| Baking.Invalid_endorsement_slot _ -> true
| _ -> false
end res ;