diff --git a/src/lib_node_shell/distributed_db.ml b/src/lib_node_shell/distributed_db.ml index 26cbe6228..658ebeac5 100644 --- a/src/lib_node_shell/distributed_db.ml +++ b/src/lib_node_shell/distributed_db.ml @@ -799,8 +799,7 @@ let commit_invalid_block net_db hash header errors = let inject_operation net_db h op = assert (Operation_hash.equal h (Operation.hash op)) ; - Raw_operation.Table.inject net_db.operation_db.table h op >>= fun res -> - return res + Raw_operation.Table.inject net_db.operation_db.table h op let commit_protocol db h p = State.Protocol.store db.disk p >>= fun res -> diff --git a/src/lib_node_shell/distributed_db.mli b/src/lib_node_shell/distributed_db.mli index bb9bd2efc..1f99b701d 100644 --- a/src/lib_node_shell/distributed_db.mli +++ b/src/lib_node_shell/distributed_db.mli @@ -232,7 +232,7 @@ end (** Inject a new operation in the local index (memory only). *) val inject_operation: - net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t + net_db -> Operation_hash.t -> Operation.t -> bool Lwt.t (** Monitor all the fetched operations (for all activate networks). *) val watch_operation: diff --git a/src/lib_node_shell/prevalidator.ml b/src/lib_node_shell/prevalidator.ml index fee49e483..c98b4c482 100644 --- a/src/lib_node_shell/prevalidator.ml +++ b/src/lib_node_shell/prevalidator.ml @@ -8,7 +8,6 @@ (**************************************************************************) open Logging.Node.Prevalidator -open Preapply_result let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = let rec pop_blocks ancestor block mempool = @@ -49,421 +48,413 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool -> Lwt.return new_mempool +type 'a request = + | Flush : State.Block.t -> unit request + | Notify : P2p.Peer_id.t * Mempool.t -> unit request + | Inject : Operation.t -> unit tzresult request + | Arrived : Operation_hash.t * Operation.t -> unit request -(** Worker *) +type message = Message: 'a request * 'a tzresult Lwt.u option -> message -open Prevalidation +let wakeup_with_result + : type t. + t request -> + t tzresult Lwt.u option -> + (t request -> t tzresult Lwt.t) -> + unit tzresult Lwt.t + = fun req u cb -> match u with + | None -> + cb req >>=? fun _res -> return () + | Some u -> + cb req >>= fun res -> + Lwt.wakeup_later u res ; + Lwt.return (res >>? fun _res -> ok ()) +(* Invariants: + - an operation is in only one of these sets (map domains): + pv.refused pv.pending pv.fetching pv.live_operations pv.in_mempool + - pv.in_mempool is the domain of all fields of pv.prevalidation_result + - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused *) type t = { - net_db: Distributed_db.net_db ; - flush: State.Block.t -> unit; - notify_operations: P2p.Peer_id.t -> Mempool.t -> unit ; - prevalidate_operations: - bool -> Operation.t list -> - (Operation_hash.t list * error Preapply_result.t) tzresult Lwt.t ; - operations: unit -> error Preapply_result.t * Operation.t Operation_hash.Map.t ; - pending: ?block:State.Block.t -> unit -> Operation.t Operation_hash.Map.t Lwt.t ; - timestamp: unit -> Time.t ; - context: unit -> Updater.validation_result tzresult Lwt.t ; - shutdown: unit -> unit Lwt.t ; + net_db : Distributed_db.net_db ; + operation_timeout : float ; + max_operations : int ; (* TODO: not sure if we should use that ? *) + canceler : Lwt_canceler.t ; + message_queue : message Lwt_pipe.t ; + mutable (* just for init *) worker : unit Lwt.t ; + mutable predecessor : State.Block.t ; + mutable timestamp : Time.t ; + mutable live_blocks : Block_hash.Set.t ; (* just a cache *) + mutable live_operations : Operation_hash.Set.t ; (* just a cache *) + mutable refused : (Time.t * error list) Operation_hash.Map.t ; + mutable fetching : Operation_hash.Set.t ; + mutable pending : Operation.t Operation_hash.Map.t ; + mutable mempool : Mempool.t ; + mutable in_mempool : Operation_hash.Set.t ; + mutable validation_result : error Preapply_result.t ; + mutable validation_state : Prevalidation.prevalidation_state tzresult ; } -let merge _key a b = - match a, b with - | None, None -> None - | Some x, None -> Some x - | _, Some y -> Some y +type error += Closed -let create - ~max_operations - ~operation_timeout - net_db = +let push_request pv request = + Lwt_pipe.safe_push_now pv.message_queue (Message (request, None)) - let net_state = Distributed_db.net_state net_db in +let push_request_and_wait pv request = + let t, u = Lwt.wait () in + Lwt.catch + (fun () -> + Lwt_pipe.push_now_exn pv.message_queue (Message (request, Some u)) ; + t) + (function + | Lwt_pipe.Closed -> fail Closed + | exn -> fail (Exn exn)) - let cancelation, cancel, _on_cancel = Lwt_utils.canceler () in - let push_to_worker, worker_waiter = Lwt_utils.queue () in +let close_queue pv = + let messages = Lwt_pipe.pop_all_now pv.message_queue in + List.iter + (function + | Message (_, Some u) -> Lwt.wakeup_later u (Error [ Closed ]) + | _ -> ()) + messages ; + Lwt_pipe.close pv.message_queue - Chain.head net_state >>= fun head -> - let timestamp = ref (Time.now ()) in - let max_number_of_operations = - try 2 * List.hd (State.Block.max_number_of_operations head) - with _ -> 0 in - (start_prevalidation - ~max_number_of_operations - ~predecessor:head - ~timestamp:!timestamp () >|= ref) >>= fun validation_state -> - let pending = Operation_hash.Table.create 53 in - let head = ref head in - let mempool = ref Mempool.empty in - let operations = ref Preapply_result.empty in - let operation_count = ref 0 in (* unprocessed + operations/mempool *) - Chain_traversal.live_blocks - !head - (State.Block.max_operations_ttl !head) - >>= fun (live_blocks, live_operations) -> - let live_blocks = ref live_blocks in - let live_operations = ref live_operations in - let running_validation = ref Lwt.return_unit in - let unprocessed = ref Operation_hash.Map.empty in - let broadcast_unprocessed = ref false in +let already_handled pv oph = + Operation_hash.Map.mem oph pv.refused + || Operation_hash.Map.mem oph pv.pending + || Operation_hash.Set.mem oph pv.fetching + || Operation_hash.Set.mem oph pv.live_operations + || Operation_hash.Set.mem oph pv.in_mempool - let set_validation_state state = - validation_state := state; - Lwt.return_unit in +let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = + { Mempool.known_valid = fst (List.split r.applied) ; + pending = + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_delayed @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_refused @@ + Operation_hash.Set.empty } - let reset_validation_state head timestamp = - start_prevalidation ~predecessor:head ~timestamp () >>= fun state -> - validation_state := state; - Lwt.return_unit in +let merge_validation_results ~old ~neu = + let open Preapply_result in + let merge _key a b = + match a, b with + | None, None -> None + | Some x, None -> Some x + | _, Some y -> Some y in + let filter_out s m = + List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in + { applied = List.rev_append neu.applied old.applied ; + refused = Operation_hash.Map.empty ; + branch_refused = + Operation_hash.Map.merge merge + (* filtering should not be required if the protocol is sound *) + (filter_out neu.applied old.branch_refused) + neu.branch_refused ; + branch_delayed = + Operation_hash.Map.merge merge + (filter_out neu.applied old.branch_delayed) + neu.branch_delayed } - let broadcast_new_operations r = - Distributed_db.Advertise.current_head - net_db - ~mempool:{ - known_valid = [] ; - pending = - List.fold_right - (fun (k, _) s -> Operation_hash.Set.add k s) - r.applied @@ - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - r.branch_delayed @@ - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - r.branch_refused @@ - Operation_hash.Set.empty ; - } - !head - in - - let handle_unprocessed () = - if Operation_hash.Map.is_empty !unprocessed then - Lwt.return () - else - let ops = !unprocessed in - let broadcast = !broadcast_unprocessed in - unprocessed := Operation_hash.Map.empty ; - broadcast_unprocessed := false ; - let ops = - Operation_hash.Set.fold - (fun k m -> Operation_hash.Map.remove k m) - !live_operations ops in - live_operations := +let handle_unprocessed pv = + begin match pv.validation_state with + | Error err -> + pv.validation_result <- + { Preapply_result.empty with + branch_delayed = + Operation_hash.Map.fold + (fun h op m -> Operation_hash.Map.add h (op, err) m) + pv.pending Operation_hash.Map.empty } ; + pv.pending <- + Operation_hash.Map.empty ; + Lwt.return () + | Ok validation_state -> + if Operation_hash.Map.is_empty pv.pending then + Lwt.return () + else + begin match Operation_hash.Map.cardinal pv.pending with + | 0 -> Lwt.return () + | n -> lwt_debug "processing %d operations" n + end >>= fun () -> + Prevalidation.prevalidate validation_state ~sort:true + (Operation_hash.Map.bindings pv.pending) + >>= fun (validation_state, validation_result) -> + pv.validation_state <- + Ok validation_state ; + pv.in_mempool <- + (Operation_hash.Map.fold + (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) + pv.pending @@ + Operation_hash.Map.fold + (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) + pv.validation_result.refused @@ + pv.in_mempool) ; + pv.refused <- (* TODO: cleanup *) + (let now = Time.now () in + Operation_hash.Map.fold + (fun h (_, errs) refused -> + Operation_hash.Map.add h (now, errs) refused) + pv.validation_result.refused pv.refused) ; + Operation_hash.Map.iter + (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph) + pv.validation_result.refused ; + pv.validation_result <- + merge_validation_results + ~old:pv.validation_result + ~neu:validation_result ; + pv.pending <- + Operation_hash.Map.empty ; + Distributed_db.Advertise.current_head + pv.net_db + ~mempool: (mempool_of_prevalidation_result validation_result) + pv.predecessor ; + Lwt.return () + end >>= fun () -> + pv.mempool <- + { Mempool.known_valid = + fst (List.split pv.validation_result.applied) ; + pending = Operation_hash.Map.fold - (fun k _ m -> Operation_hash.Set.add k m) - ops !live_operations ; - running_validation := begin - begin - Lwt_list.filter_map_p - (fun (h, op) -> - if Block_hash.Set.mem op.Operation.shell.branch !live_blocks then - Lwt.return_some (h, op) - else begin - Distributed_db.Operation.clear_or_cancel net_db h ; - Lwt.return_none - end) - (Operation_hash.Map.bindings ops) >>= fun rops -> - operation_count := - !operation_count - Operation_hash.Map.cardinal ops + List.length rops ; - match !validation_state with - | Ok validation_state -> - prevalidate validation_state ~sort:true rops >>= fun (state, r) -> - Lwt.return (Ok state, r) - | Error err -> - let r = - { Preapply_result.empty with - branch_delayed = - List.fold_left - (fun m (h, op) -> Operation_hash.Map.add h (op, err) m) - Operation_hash.Map.empty rops ; } in - Lwt.return (!validation_state, r) - end >>= fun (state, r) -> - let filter_out s m = - List.fold_right (fun (h, _op) -> Operation_hash.Set.remove h) s m in - mempool := { - known_valid = !mempool.known_valid @ List.rev_map fst r.applied ; - pending = - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - r.branch_delayed @@ - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - r.branch_refused @@ - filter_out r.applied !mempool.pending ; - } ; - let filter_out s m = - List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in - operations := { - applied = List.rev_append r.applied !operations.applied ; - refused = Operation_hash.Map.empty ; - branch_refused = - Operation_hash.Map.merge merge - (* filter_out should not be required here, TODO warn ? *) - (filter_out r.applied !operations.branch_refused) - r.branch_refused ; - branch_delayed = - Operation_hash.Map.merge merge - (filter_out r.applied !operations.branch_delayed) - r.branch_delayed ; - } ; - Mempool.set net_state - ~head:(State.Block.hash !head) !mempool >>= fun () -> - if broadcast then broadcast_new_operations r ; - Lwt_list.iter_s - (fun (op, _exns) -> - Distributed_db.Operation.clear_or_cancel net_db op ; - Lwt.return_unit) - (Operation_hash.Map.bindings r.refused) >>= fun () -> - (* TODO. Keep a bounded set of 'refused' operations. *) - (* TODO. Log the error in some statistics associated to - the peers that informed us of the operations. And - eventually blacklist bad peers. *) - (* TODO. Keep a bounded set of 'branch_refused' operations - into the 'state'. It should be associated to the - current block, and updated on 'set_current_head'. *) - set_validation_state state - end; - Lwt.catch - (fun () -> !running_validation) - (fun _ -> lwt_debug "<- prevalidate (cancel)") - in + (fun k _ s -> Operation_hash.Set.add k s) + pv.validation_result.branch_delayed @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + pv.validation_result.branch_refused @@ + Operation_hash.Set.empty } ; + Mempool.set (Distributed_db.net_state pv.net_db) + ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> + Lwt.return () - let prevalidation_worker = - - let rec worker_loop () = - (* TODO lookup in `!pending` for 'outdated' ops and re-add them - in `unprocessed` (e.g. if the previous tentative was - more 5 seconds ago) *) - handle_unprocessed () >>= fun () -> - Lwt.pick [(worker_waiter () >|= fun q -> `Process q); - (cancelation () >|= fun () -> `Cancel)] >>= function - | `Cancel -> Lwt.return_unit - | `Process q -> - Lwt_list.iter_s - (function - | `Prevalidate (ops, w, force) -> begin - let result = - let rops = Operation_hash.Map.bindings ops in - Lwt.return !validation_state >>=? fun validation_state -> - prevalidate validation_state - ~sort:true rops >>= fun (state, res) -> - let register h op = - incr operation_count ; - live_operations := - Operation_hash.Set.add h !live_operations ; - Distributed_db.inject_operation - net_db h op >>=? fun (_ : bool) -> - return () in - iter_s - (fun (h, op) -> - register h op >>=? fun () -> - mempool := { !mempool with - known_valid = - !mempool.known_valid @ [h] } ; - operations := - { !operations with - applied = (h, op) :: !operations.applied } ; - return () ) - res.applied >>=? fun () -> - Mempool.set net_state - ~head:(State.Block.hash !head) !mempool >>= fun () -> - broadcast_new_operations res ; - begin - if force then - iter_p - (fun (h, (op, _exns)) -> register h op) - (Operation_hash.Map.bindings - res.branch_delayed) >>=? fun () -> - iter_p - (fun (h, (op, _exns)) -> register h op) - (Operation_hash.Map.bindings - res.branch_refused) >>=? fun () -> - operations := - { !operations with - branch_delayed = - Operation_hash.Map.merge merge - !operations.branch_delayed res.branch_delayed ; - branch_refused = - Operation_hash.Map.merge merge - !operations.branch_refused res.branch_refused ; - } ; - return () - else - return () - end >>=? fun () -> - set_validation_state (Ok state) >>= fun () -> - return res - in - result >>= fun result -> - Lwt.wakeup w result ; - Lwt.return_unit - end - | `Register (_gid, _mempool) when !operation_count >= max_operations -> - Lwt.return_unit - | `Register (gid, mempool) -> - let ops = - Operation_hash.Set.elements mempool.Mempool.pending @ - mempool.known_valid in - let known_ops, unknown_ops = - List.partition - (fun op -> - Operation_hash.Table.mem pending op - || Operation_hash.Set.mem op !live_operations) - ops in - let fetch h = - Distributed_db.Operation.fetch - ~timeout:operation_timeout - net_db ~peer:gid h () >>= function - | Ok op -> - push_to_worker (`Handle (h, op)) ; - Lwt.return_unit - | Error [ Distributed_db.Operation.Canceled _ ] -> - lwt_debug - "operation %a included before being prevalidated" - Operation_hash.pp_short h >>= fun () -> - Operation_hash.Table.remove pending h ; - Lwt.return_unit - | Error _ -> - Operation_hash.Table.remove pending h ; - Lwt.return_unit - in - List.iter - (fun op -> Operation_hash.Table.add pending op (fetch op)) - unknown_ops ; - List.iter - (fun op -> - Lwt.ignore_result - (Distributed_db.Operation.fetch - ~timeout:operation_timeout - net_db ~peer:gid op ())) - known_ops ; - Lwt.return_unit - | `Handle (h, op) -> - Operation_hash.Table.remove pending h ; - if !operation_count < max_operations then begin - broadcast_unprocessed := true ; - incr operation_count ; - unprocessed := Operation_hash.Map.singleton h op ; - lwt_debug "register %a" Operation_hash.pp_short h >>= fun () -> - Lwt.return_unit - end else begin - Distributed_db.Operation.clear_or_cancel net_db h ; - Lwt.return_unit - end - | `Flush (new_head : State.Block.t) -> - list_pendings - ~maintain_net_db:net_db - ~from_block:!head ~to_block:new_head - (Preapply_result.operations !operations) >>= fun new_mempool -> - Chain_traversal.live_blocks - new_head - (State.Block.max_operations_ttl new_head) - >>= fun (new_live_blocks, new_live_operations) -> - lwt_debug "flush %a (mempool: %d)" - Block_hash.pp_short (State.Block.hash new_head) - (Operation_hash.Map.cardinal new_mempool) >>= fun () -> - (* Reset the pre-validation context *) - head := new_head ; - mempool := Mempool.empty ; - operations := Preapply_result.empty ; - broadcast_unprocessed := false ; - unprocessed := new_mempool ; - operation_count := Operation_hash.Map.cardinal new_mempool ; - timestamp := Time.now () ; - live_blocks := new_live_blocks ; - live_operations := new_live_operations ; - (* Reset the prevalidation context. *) - reset_validation_state new_head !timestamp) - q >>= fun () -> - worker_loop () - in - Lwt_utils.worker - (Format.asprintf "prevalidator.%a" - Net_id.pp (State.Net.id net_state)) - ~run:worker_loop ~cancel in - - let flush head = - push_to_worker (`Flush head) ; - if not (Lwt.is_sleeping !running_validation) then - Lwt.cancel !running_validation - in - let notify_operations gid mempool = - Lwt.async begin fun () -> - push_to_worker (`Register (gid, mempool)) ; +let fetch_operation pv ?peer oph = + debug "fetching operation %a" Operation_hash.pp_short oph ; + Distributed_db.Operation.fetch + ~timeout:pv.operation_timeout + pv.net_db ?peer oph () >>= function + | Ok op -> + push_request pv (Arrived (oph, op)) ; + Lwt.return_unit + | Error [ Distributed_db.Operation.Canceled _ ] -> + lwt_debug + "operation %a included before being prevalidated" + Operation_hash.pp_short oph >>= fun () -> + Lwt.return_unit + | Error _ -> (* should not happen *) Lwt.return_unit - end in - let prevalidate_operations force raw_ops = - let ops = List.map Operation.hash raw_ops in - let ops_map = - List.fold_left - (fun map op -> - Operation_hash.Map.add (Operation.hash op) op map) - Operation_hash.Map.empty raw_ops in - let wait, waker = Lwt.wait () in - push_to_worker (`Prevalidate (ops_map, waker, force)); - wait >>=? fun result -> - return (ops, result) in - let shutdown () = - lwt_debug "shutdown" >>= fun () -> - if not (Lwt.is_sleeping !running_validation) then - Lwt.cancel !running_validation; - cancel () >>= fun () -> - prevalidation_worker in - let pending ?block () = - let ops = Preapply_result.operations !operations in - match block with - | None -> Lwt.return ops - | Some to_block -> list_pendings ~from_block:!head ~to_block ops in - let context () = - Lwt.return !validation_state >>=? fun prevalidation_state -> - Prevalidation.end_prevalidation prevalidation_state in - Lwt.return { - net_db ; - flush ; - notify_operations ; - prevalidate_operations ; - operations = - (fun () -> - { !operations with applied = List.rev !operations.applied }, - !unprocessed) ; - pending ; - timestamp = (fun () -> !timestamp) ; - context ; - shutdown ; - } -let flush pv head = pv.flush head -let notify_operations pv = pv.notify_operations -let prevalidate_operations pv = pv.prevalidate_operations -let operations pv = pv.operations () -let pending ?block pv = pv.pending ?block () -let timestamp pv = pv.timestamp () -let context pv = pv.context () -let shutdown pv = pv.shutdown () +let clear_fetching pv = + Operation_hash.Set.iter + (Distributed_db.Operation.clear_or_cancel pv.net_db) + pv.fetching -let inject_operation pv ?(force = false) (op: Operation.t) = - let wrap_error h map = - begin - try return (snd (Operation_hash.Map.find h map)) - with Not_found -> - failwith "unexpected protocol result" - end >>=? fun errors -> - Lwt.return (Error errors) in - pv.prevalidate_operations force [op] >>=? function - | ([h], { applied = [h', _] }) when Operation_hash.equal h h' -> - return () - | ([h], { refused }) - when Operation_hash.Map.cardinal refused = 1 -> - wrap_error h refused - | ([h], { branch_refused }) - when Operation_hash.Map.cardinal branch_refused = 1 && not force -> - wrap_error h branch_refused - | ([h], { branch_delayed }) - when Operation_hash.Map.cardinal branch_delayed = 1 && not force -> - wrap_error h branch_delayed - | _ -> - if force then - return () +let on_operation_arrived pv oph op = + debug "operation %a retrieved" Operation_hash.pp_short oph ; + pv.fetching <- Operation_hash.Set.remove oph pv.fetching ; + if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin + Distributed_db.Operation.clear_or_cancel pv.net_db oph + (* TODO: put in a specific delayed map ? *) + end else if not (already_handled pv oph) (* prevent double inclusion on flush *) then begin + pv.pending <- Operation_hash.Map.add oph op pv.pending + end + +let on_inject pv op = + let oph = Operation.hash op in + log_notice "injection of operation %a" Operation_hash.pp_short oph ; + begin + begin if already_handled pv oph then + return pv.validation_result else - failwith "Unexpected result for prevalidation." + Lwt.return pv.validation_state >>=? fun validation_state -> + Prevalidation.prevalidate + validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> + match result.applied with + | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> + Distributed_db.inject_operation pv.net_db oph op >>= fun (_ : bool) -> + pv.pending <- Operation_hash.Map.add oph op pv.pending ; + return result + | _ -> + return result + end >>=? fun result -> + if List.mem_assoc oph result.applied then + return () + else + let try_in_map map or_else = + try + Lwt.return (Error (snd (Operation_hash.Map.find oph map))) + with Not_found -> or_else () in + try_in_map pv.refused @@ fun () -> + try_in_map result.refused @@ fun () -> + try_in_map result.branch_refused @@ fun () -> + try_in_map result.branch_delayed @@ fun () -> + if Operation_hash.Set.mem oph pv.live_operations then + failwith "Injected operation %a included in a previous block." + Operation_hash.pp oph + else + failwith "Injected operation %a is not in prevalidation result." + Operation_hash.pp oph + end >>= fun tzresult -> + return tzresult + +let on_notify pv peer mempool = + let all_ophs = + List.fold_left + (fun s oph -> Operation_hash.Set.add oph s) + mempool.Mempool.pending mempool.known_valid in + let to_fetch = + Operation_hash.Set.filter + (fun oph -> not (already_handled pv oph)) + all_ophs in + debug "notification of %d new operations" (Operation_hash.Set.cardinal to_fetch) ; + pv.fetching <- + Operation_hash.Set.union + to_fetch + pv.fetching ; + Operation_hash.Set.iter + (fun oph -> Lwt.ignore_result (fetch_operation ~peer pv oph)) + to_fetch + +let on_flush pv predecessor = + list_pendings + ~maintain_net_db:pv.net_db + ~from_block:pv.predecessor ~to_block:predecessor + (Preapply_result.operations pv.validation_result) >>= fun pending -> + let timestamp = Time.now () in + Chain_traversal.live_blocks + predecessor + (State.Block.max_operations_ttl predecessor) + >>= fun (new_live_blocks, new_live_operations) -> + Prevalidation.start_prevalidation + ~predecessor ~timestamp () >>= fun validation_state -> + begin match validation_state with + | Error _ -> Lwt.return (validation_state, Preapply_result.empty) + | Ok validation_state -> + Prevalidation.prevalidate + validation_state ~sort:false [] >>= fun (state, result) -> + Lwt.return (Ok state, result) + end >>= fun (validation_state, validation_result) -> + lwt_log_notice "flushing the mempool for new head %a (%d operations)" + Block_hash.pp_short (State.Block.hash predecessor) + (Operation_hash.Map.cardinal pending) >>= fun () -> + pv.predecessor <- predecessor ; + pv.live_blocks <- new_live_blocks ; + pv.live_operations <- new_live_operations ; + pv.timestamp <- timestamp ; + pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty }; + pv.pending <- pending ; + pv.in_mempool <- Operation_hash.Set.empty ; + pv.validation_result <- validation_result ; + pv.validation_state <- validation_state ; + return () + +let rec worker_loop pv = + begin + handle_unprocessed pv >>= fun () -> + Lwt_utils.protect ~canceler:pv.canceler begin fun () -> + Lwt_pipe.pop pv.message_queue >>= return + end >>=? fun (Message (message, u)) -> + wakeup_with_result message u @@ function + | Flush block -> + on_flush pv block >>=? fun () -> + return () + | Notify (peer, mempool) -> + on_notify pv peer mempool ; + return () + | Inject op -> + on_inject pv op + | Arrived (oph, op) -> + on_operation_arrived pv oph op ; + return () + end >>= function + | Ok () -> + worker_loop pv + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + close_queue pv ; + clear_fetching pv ; + Lwt.return_unit + | Error err -> + lwt_log_error "@[Unexpected error:@ %a@]" + pp_print_error err >>= fun () -> + close_queue pv ; + clear_fetching pv ; + Lwt_canceler.cancel pv.canceler >>= fun () -> + Lwt.return_unit + +let create ~max_operations ~operation_timeout net_db = + let net_state = Distributed_db.net_state net_db in + let canceler = Lwt_canceler.create () in + let message_queue = Lwt_pipe.create () in + State.read_chain_store net_state + (fun _ { current_head ; current_mempool ; live_blocks ; live_operations } -> + Lwt.return (current_head, current_mempool, live_blocks, live_operations)) + >>= fun (predecessor, mempool, live_blocks, live_operations) -> + let timestamp = Time.now () in + Prevalidation.start_prevalidation + ~predecessor ~timestamp () >>= fun validation_state -> + begin match validation_state with + | Error _ -> Lwt.return (validation_state, Preapply_result.empty) + | Ok validation_state -> + Prevalidation.prevalidate validation_state ~sort:false [] + >>= fun (validation_state, validation_result) -> + + Lwt.return (Ok validation_state, validation_result) + end >>= fun (validation_state, validation_result) -> + let fetching = + List.fold_left + (fun s h -> Operation_hash.Set.add h s) + Operation_hash.Set.empty mempool.known_valid in + let pv = + { operation_timeout ; max_operations ; + net_db ; canceler ; + worker = Lwt.return_unit ; message_queue ; + predecessor ; timestamp ; live_blocks ; live_operations ; + mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; + refused = Operation_hash.Map.empty ; + fetching ; + pending = Operation_hash.Map.empty ; + in_mempool = Operation_hash.Set.empty ; + validation_result ; validation_state } in + List.iter + (fun oph -> Lwt.ignore_result (fetch_operation pv oph)) + mempool.known_valid ; + pv.worker <- + Lwt_utils.worker + (Format.asprintf "net_prevalidator.%a" Net_id.pp (State.Net.id net_state)) + ~run:(fun () -> worker_loop pv) + ~cancel:(fun () -> Lwt_canceler.cancel pv.canceler) ; + Lwt.return pv + +let shutdown pv = + lwt_debug "shutdown" >>= fun () -> + Lwt_canceler.cancel pv.canceler >>= fun () -> + pv.worker + +let flush pv head = + push_request pv (Flush head) + +let notify_operations pv peer mempool = + push_request pv (Notify (peer, mempool)) + +let operations pv = + { pv.validation_result with + applied = List.rev pv.validation_result.applied }, + pv.pending + +let pending ?block pv = + let ops = Preapply_result.operations pv.validation_result in + match block with + | Some to_block -> + list_pendings + ~maintain_net_db:pv.net_db + ~from_block:pv.predecessor ~to_block ops + | None -> Lwt.return ops + +let timestamp pv = pv.timestamp + +let context pv = + Lwt.return pv.validation_state >>=? fun validation_state -> + Prevalidation.end_prevalidation validation_state + +let inject_operation pv op = + push_request_and_wait pv (Inject op) >>=? fun result -> + Lwt.return result diff --git a/src/lib_node_shell/prevalidator.mli b/src/lib_node_shell/prevalidator.mli index a081c918b..695d516b8 100644 --- a/src/lib_node_shell/prevalidator.mli +++ b/src/lib_node_shell/prevalidator.mli @@ -43,7 +43,7 @@ val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit be ignored when it is (strongly) refused This is the entry-point used by the P2P layer. The operation content has been previously stored on disk. *) -val inject_operation: t -> ?force:bool -> Operation.t -> unit tzresult Lwt.t +val inject_operation: t -> Operation.t -> unit tzresult Lwt.t val flush: t -> State.Block.t -> unit val timestamp: t -> Time.t