ligo/src/node/shell/prevalidator.ml

382 lines
15 KiB
OCaml
Raw Normal View History

2016-09-08 21:13:10 +04:00
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Logging.Node.Prevalidator
2016-09-08 21:13:10 +04:00
let list_pendings ~from_block ~to_block old_mempool =
let rec pop_blocks ancestor block mempool =
let hash = State.Block.hash block in
if Block_hash.equal hash ancestor then
Lwt.return mempool
else
State.Block.all_operation_hashes block >>= fun operations ->
let mempool =
List.fold_left
(List.fold_left (fun mempool h -> Operation_hash.Set.add h mempool))
mempool operations in
State.Block.predecessor block >>= function
| None -> assert false
| Some predecessor -> pop_blocks ancestor predecessor mempool
in
let push_block mempool block =
State.Block.all_operation_hashes block >|= fun operations ->
List.fold_left
(List.fold_left (fun mempool h -> Operation_hash.Set.remove h mempool))
mempool operations
in
Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, path) ->
pop_blocks
(State.Block.hash ancestor)
from_block old_mempool >>= fun mempool ->
Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool ->
Lwt.return new_mempool
2016-09-08 21:13:10 +04:00
(** Worker *)
exception Invalid_operation of Operation_hash.t
open Prevalidation
2016-09-08 21:13:10 +04:00
type t = {
net_db: Distributed_db.net_db ;
flush: State.Block.t -> unit;
2017-03-04 12:53:44 +04:00
notify_operations: P2p.Peer_id.t -> Operation_hash.t list -> unit ;
2016-09-08 21:13:10 +04:00
prevalidate_operations:
bool -> Operation.t list ->
(Operation_hash.t list * error preapply_result) tzresult Lwt.t ;
operations: unit -> error preapply_result * Operation_hash.Set.t ;
pending: ?block:State.Block.t -> unit -> Operation_hash.Set.t Lwt.t ;
2016-09-08 21:13:10 +04:00
timestamp: unit -> Time.t ;
context: unit -> Updater.validation_result tzresult Lwt.t ;
2016-09-08 21:13:10 +04:00
shutdown: unit -> unit Lwt.t ;
}
let merge _key a b =
match a, b with
| None, None -> None
| Some x, None -> Some x
| _, Some y -> Some y
let create net_db =
2016-09-08 21:13:10 +04:00
let net_state = Distributed_db.net_state net_db in
2016-09-08 21:13:10 +04:00
let cancelation, cancel, _on_cancel = Lwt_utils.canceler () in
let push_to_worker, worker_waiter = Lwt_utils.queue () in
Chain.head net_state >>= fun head ->
2016-09-08 21:13:10 +04:00
let timestamp = ref (Time.now ()) in
(start_prevalidation head !timestamp () >|= ref) >>= fun validation_state ->
2017-03-04 12:53:44 +04:00
let pending = Operation_hash.Table.create 53 in
let head = ref head in
let operations = ref empty_result in
Chain_traversal.live_blocks
!head
(State.Block.max_operations_ttl !head)
>>= fun (live_blocks, live_operations) ->
let live_blocks = ref live_blocks in
let live_operations = ref live_operations in
2016-09-08 21:13:10 +04:00
let running_validation = ref Lwt.return_unit in
let unprocessed = ref Operation_hash.Set.empty in
2016-09-08 21:13:10 +04:00
let broadcast_unprocessed = ref false in
let set_validation_state state =
validation_state := state;
Lwt.return_unit in
let reset_validation_state head timestamp =
start_prevalidation head timestamp () >>= fun state ->
validation_state := state;
2016-09-08 21:13:10 +04:00
Lwt.return_unit in
let broadcast_operation ops =
let hash = State.Block.hash !head in
Distributed_db.broadcast_head net_db hash ops in
2016-09-08 21:13:10 +04:00
let handle_unprocessed () =
if Operation_hash.Set.is_empty !unprocessed then
2016-09-08 21:13:10 +04:00
Lwt.return ()
else
let ops = !unprocessed in
let broadcast = !broadcast_unprocessed in
unprocessed := Operation_hash.Set.empty ;
2016-09-08 21:13:10 +04:00
broadcast_unprocessed := false ;
let ops = Operation_hash.Set.diff ops !live_operations in
live_operations := Operation_hash.Set.(fold add) !live_operations ops ;
2016-09-08 21:13:10 +04:00
running_validation := begin
begin
Lwt_list.filter_map_p
(fun h ->
Distributed_db.Operation.read_opt net_db h >>= function
| Some po when Block_hash.Set.mem po.shell.branch !live_blocks ->
Lwt.return_some (h, po)
| Some _ | None -> Lwt.return_none)
(Operation_hash.Set.elements ops) >>= fun rops ->
(Lwt.return !validation_state >>=? fun validation_state ->
(prevalidate validation_state ~sort:true rops >>= return)) >>= function
| Ok (state, r) -> Lwt.return (Ok state, r)
2016-09-08 21:13:10 +04:00
| Error err ->
let r =
{ empty_result with
2016-09-08 21:13:10 +04:00
branch_delayed =
Operation_hash.Set.fold
(fun op m -> Operation_hash.Map.add op err m)
ops Operation_hash.Map.empty ; } in
Lwt.return (!validation_state, r)
end >>= fun (state, r) ->
2016-09-08 21:13:10 +04:00
let filter_out s m =
List.fold_right Operation_hash.Map.remove s m in
2016-09-08 21:13:10 +04:00
operations := {
applied = List.rev_append r.applied !operations.applied ;
refused = Operation_hash.Map.empty ;
branch_refused =
Operation_hash.Map.merge merge
(* filter_out should not be required here, TODO warn ? *)
(filter_out r.applied !operations.branch_refused)
r.branch_refused ;
branch_delayed =
Operation_hash.Map.merge merge
(filter_out r.applied !operations.branch_delayed)
r.branch_delayed ;
2016-09-08 21:13:10 +04:00
} ;
Chain.set_reversed_mempool net_state !operations.applied >>= fun () ->
if broadcast then broadcast_operation r.applied ;
2016-09-08 21:13:10 +04:00
Lwt_list.iter_s
(fun (_op, _exns) ->
(* FIXME *)
(* Distributed_db.Operation.mark_invalid net_db op exns >>= fun _ -> *)
2016-09-08 21:13:10 +04:00
Lwt.return_unit)
(Operation_hash.Map.bindings r.refused) >>= fun () ->
2016-09-08 21:13:10 +04:00
(* TODO. Keep a bounded set of 'refused' operations. *)
(* TODO. Log the error in some statistics associated to
the peers that informed us of the operations. And
eventually blacklist bad peers. *)
(* TODO. Keep a bounded set of 'branch_refused' operations
into the 'state'. It should be associated to the
current block, and updated on 'set_current_head'. *)
set_validation_state state
2016-09-08 21:13:10 +04:00
end;
Lwt.catch
(fun () -> !running_validation)
(fun _ -> lwt_debug "<- prevalidate (cancel)")
in
let prevalidation_worker =
let rec worker_loop () =
(* TODO lookup in `!pending` for 'outdated' ops and re-add them
in `unprocessed` (e.g. if the previous tentative was
more 5 seconds ago) *)
handle_unprocessed () >>= fun () ->
Lwt.pick [(worker_waiter () >|= fun q -> `Process q);
(cancelation () >|= fun () -> `Cancel)] >>= function
| `Cancel -> Lwt.return_unit
| `Process q ->
Lwt_list.iter_s
(function
| `Prevalidate (ops, w, force) -> begin
let result =
let rops = Operation_hash.Map.bindings ops in
Lwt.return !validation_state >>=? fun validation_state ->
prevalidate validation_state
~sort:true rops >>= fun (state, res) ->
2016-09-08 21:13:10 +04:00
let register h =
let op = Operation_hash.Map.find h ops in
2017-08-04 20:43:13 +04:00
live_operations := Operation_hash.Set.add h !live_operations ;
Distributed_db.inject_operation
net_db h op >>=? fun (_ : bool) ->
return () in
iter_s
2016-09-08 21:13:10 +04:00
(fun h ->
register h >>=? fun () ->
2016-09-08 21:13:10 +04:00
operations :=
{ !operations with
applied = h :: !operations.applied };
return () )
res.applied >>=? fun () ->
Chain.set_reversed_mempool
net_state !operations.applied >>= fun () ->
broadcast_operation res.applied ;
2016-09-08 21:13:10 +04:00
begin
if force then
iter_p
2016-09-08 21:13:10 +04:00
(fun (h, _exns) -> register h)
(Operation_hash.Map.bindings
res.branch_delayed) >>=? fun () ->
iter_p
2016-09-08 21:13:10 +04:00
(fun (h, _exns) -> register h)
(Operation_hash.Map.bindings
res.branch_refused) >>=? fun () ->
2016-09-08 21:13:10 +04:00
operations :=
{ !operations with
branch_delayed =
Operation_hash.Map.merge merge
2016-09-08 21:13:10 +04:00
!operations.branch_delayed res.branch_delayed ;
branch_refused =
Operation_hash.Map.merge merge
2016-09-08 21:13:10 +04:00
!operations.branch_refused res.branch_refused ;
} ;
return ()
2016-09-08 21:13:10 +04:00
else
return ()
end >>=? fun () ->
set_validation_state (Ok state) >>= fun () ->
2016-09-08 21:13:10 +04:00
return res
in
result >>= fun result ->
Lwt.wakeup w result ;
Lwt.return_unit
end
2017-03-04 12:53:44 +04:00
| `Register (gid, ops) ->
let known_ops, unknown_ops =
List.partition
(fun op ->
Operation_hash.Table.mem pending op
|| Operation_hash.Set.mem op !live_operations)
ops in
2017-03-04 12:53:44 +04:00
let fetch op =
Distributed_db.Operation.fetch
~timeout:10. (* TODO allow to adjust the constant ... *)
net_db ~peer:gid op () >>= fun _op ->
2017-03-04 12:53:44 +04:00
push_to_worker (`Handle op) ;
Lwt.return_unit
in
List.iter
(fun op -> Operation_hash.Table.add pending op (fetch op))
unknown_ops ;
List.iter (fun op ->
Lwt.ignore_result
(Distributed_db.Operation.fetch
(* TODO allow to adjust the constant ... *)
~timeout:10.
net_db ~peer:gid op ()))
2017-03-04 12:53:44 +04:00
known_ops ;
Lwt.return_unit
| `Handle op ->
2016-09-08 21:13:10 +04:00
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
2017-03-04 12:53:44 +04:00
Operation_hash.Table.remove pending op ;
2016-09-08 21:13:10 +04:00
broadcast_unprocessed := true ;
unprocessed := Operation_hash.Set.singleton op ;
2017-03-04 12:53:44 +04:00
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
2016-09-08 21:13:10 +04:00
Lwt.return_unit
| `Flush (new_head : State.Block.t) ->
list_pendings ~from_block:!head ~to_block:new_head
(preapply_result_operations !operations) >>= fun new_mempool ->
Chain_traversal.live_blocks
new_head
(State.Block.max_operations_ttl new_head)
>>= fun (new_live_blocks, new_live_operations) ->
2016-09-08 21:13:10 +04:00
lwt_debug "flush %a (mempool: %d)"
Block_hash.pp_short (State.Block.hash new_head)
(Operation_hash.Set.cardinal new_mempool) >>= fun () ->
2016-09-08 21:13:10 +04:00
(* Reset the pre-validation context *)
head := new_head ;
operations := empty_result ;
2016-09-08 21:13:10 +04:00
broadcast_unprocessed := false ;
unprocessed := new_mempool ;
timestamp := Time.now () ;
live_blocks := new_live_blocks ;
live_operations := new_live_operations ;
(* Reset the prevalidation context. *)
reset_validation_state new_head !timestamp)
2016-09-08 21:13:10 +04:00
q >>= fun () ->
worker_loop ()
in
Lwt_utils.worker "prevalidator" ~run:worker_loop ~cancel in
let flush head =
push_to_worker (`Flush head) ;
2016-09-08 21:13:10 +04:00
if not (Lwt.is_sleeping !running_validation) then
Lwt.cancel !running_validation
in
2017-03-04 12:53:44 +04:00
let notify_operations gid ops =
Lwt.async begin fun () ->
2017-03-04 12:53:44 +04:00
push_to_worker (`Register (gid, ops)) ;
Lwt.return_unit
end in
2016-09-08 21:13:10 +04:00
let prevalidate_operations force raw_ops =
let ops = List.map Operation.hash raw_ops in
2016-09-08 21:13:10 +04:00
let ops_map =
List.fold_left
(fun map op ->
Operation_hash.Map.add (Operation.hash op) op map)
Operation_hash.Map.empty raw_ops in
2016-09-08 21:13:10 +04:00
let wait, waker = Lwt.wait () in
push_to_worker (`Prevalidate (ops_map, waker, force));
wait >>=? fun result ->
return (ops, result) in
let shutdown () =
lwt_debug "shutdown" >>= fun () ->
if not (Lwt.is_sleeping !running_validation) then
Lwt.cancel !running_validation;
cancel () >>= fun () ->
prevalidation_worker in
let pending ?block () =
let ops = preapply_result_operations !operations in
match block with
| None -> Lwt.return ops
| Some to_block -> list_pendings ~from_block:!head ~to_block ops in
let context () =
Lwt.return !validation_state >>=? fun prevalidation_state ->
Prevalidation.end_prevalidation prevalidation_state in
2016-09-08 21:13:10 +04:00
Lwt.return {
net_db ;
2016-09-08 21:13:10 +04:00
flush ;
2017-03-04 12:53:44 +04:00
notify_operations ;
2016-09-08 21:13:10 +04:00
prevalidate_operations ;
operations =
(fun () ->
{ !operations with applied = List.rev !operations.applied },
!unprocessed) ;
pending ;
2016-09-08 21:13:10 +04:00
timestamp = (fun () -> !timestamp) ;
context ;
2016-09-08 21:13:10 +04:00
shutdown ;
}
let flush pv head = pv.flush head
2017-03-04 12:53:44 +04:00
let notify_operations pv = pv.notify_operations
2016-09-08 21:13:10 +04:00
let prevalidate_operations pv = pv.prevalidate_operations
let operations pv = pv.operations ()
let pending ?block pv = pv.pending ?block ()
2016-09-08 21:13:10 +04:00
let timestamp pv = pv.timestamp ()
let context pv = pv.context ()
let shutdown pv = pv.shutdown ()
let inject_operation pv ?(force = false) (op: Operation.t) =
let net_id = State.Net.id (Distributed_db.net_state pv.net_db) in
2016-09-08 21:13:10 +04:00
let wrap_error h map =
begin
try return (Operation_hash.Map.find h map)
2016-09-08 21:13:10 +04:00
with Not_found ->
failwith "unexpected protocol result"
end >>=? fun errors ->
Lwt.return (Error errors) in
2017-03-31 15:04:05 +04:00
fail_unless (Net_id.equal net_id op.shell.net_id)
(failure
2016-09-08 21:13:10 +04:00
"Prevalidator.inject_operation: invalid network") >>=? fun () ->
pv.prevalidate_operations force [op] >>=? function
| ([h], { applied = [h'] }) when Operation_hash.equal h h' ->
2016-09-08 21:13:10 +04:00
return ()
| ([h], { refused })
when Operation_hash.Map.cardinal refused = 1 ->
2016-09-08 21:13:10 +04:00
wrap_error h refused
| ([h], { branch_refused })
when Operation_hash.Map.cardinal branch_refused = 1 && not force ->
2016-09-08 21:13:10 +04:00
wrap_error h branch_refused
| ([h], { branch_delayed })
when Operation_hash.Map.cardinal branch_delayed = 1 && not force ->
2016-09-08 21:13:10 +04:00
wrap_error h branch_delayed
| _ ->
if force then
return ()
else
failwith "Unexpected result for prevalidation."