(**************************************************************************) (* *) (* Copyright (c) 2014 - 2017. *) (* Dynamic Ledger Solutions, Inc. *) (* *) (* All rights reserved. No warranty, explicit or implicit, provided. *) (* *) (**************************************************************************) open Logging.Node.Prevalidator let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = let rec pop_blocks ancestor block mempool = let hash = State.Block.hash block in if Block_hash.equal hash ancestor then Lwt.return mempool else State.Block.all_operations block >>= fun operations -> Lwt_list.fold_left_s (Lwt_list.fold_left_s (fun mempool op -> let h = Operation.hash op in Lwt_utils.may maintain_net_db ~f:begin fun net_db -> Distributed_db.inject_operation net_db h op >>= fun _ -> Lwt.return_unit end >>= fun () -> Lwt.return (Operation_hash.Map.add h op mempool))) mempool operations >>= fun mempool -> State.Block.predecessor block >>= function | None -> assert false | Some predecessor -> pop_blocks ancestor predecessor mempool in let push_block mempool block = State.Block.all_operation_hashes block >|= fun operations -> Option.iter maintain_net_db ~f:(fun net_db -> List.iter (List.iter (Distributed_db.Operation.clear_or_cancel net_db)) operations) ; List.fold_left (List.fold_left (fun mempool h -> Operation_hash.Map.remove h mempool)) mempool operations in Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, path) -> pop_blocks (State.Block.hash ancestor) from_block old_mempool >>= fun mempool -> Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool -> Lwt.return new_mempool type 'a request = | Flush : State.Block.t -> unit request | Notify : P2p.Peer_id.t * Mempool.t -> unit request | Inject : Operation.t -> unit tzresult request | Arrived : Operation_hash.t * Operation.t -> unit request type message = Message: 'a request * 'a tzresult Lwt.u option -> message let wakeup_with_result : type t. t request -> t tzresult Lwt.u option -> (t request -> t tzresult Lwt.t) -> unit tzresult Lwt.t = fun req u cb -> match u with | None -> cb req >>=? fun _res -> return () | Some u -> cb req >>= fun res -> Lwt.wakeup_later u res ; Lwt.return (res >>? fun _res -> ok ()) (* Invariants: - an operation is in only one of these sets (map domains): pv.refused pv.pending pv.fetching pv.live_operations pv.in_mempool - pv.in_mempool is the domain of all fields of pv.prevalidation_result - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused *) type t = { net_db : Distributed_db.net_db ; operation_timeout : float ; max_operations : int ; (* TODO: not sure if we should use that ? *) canceler : Lwt_canceler.t ; message_queue : message Lwt_pipe.t ; mutable (* just for init *) worker : unit Lwt.t ; mutable predecessor : State.Block.t ; mutable timestamp : Time.t ; mutable live_blocks : Block_hash.Set.t ; (* just a cache *) mutable live_operations : Operation_hash.Set.t ; (* just a cache *) mutable refused : (Time.t * error list) Operation_hash.Map.t ; mutable fetching : Operation_hash.Set.t ; mutable pending : Operation.t Operation_hash.Map.t ; mutable mempool : Mempool.t ; mutable in_mempool : Operation_hash.Set.t ; mutable validation_result : error Preapply_result.t ; mutable validation_state : Prevalidation.prevalidation_state tzresult ; } type error += Closed let push_request pv request = Lwt_pipe.safe_push_now pv.message_queue (Message (request, None)) let push_request_and_wait pv request = let t, u = Lwt.wait () in Lwt.catch (fun () -> Lwt_pipe.push_now_exn pv.message_queue (Message (request, Some u)) ; t) (function | Lwt_pipe.Closed -> fail Closed | exn -> fail (Exn exn)) let close_queue pv = let messages = Lwt_pipe.pop_all_now pv.message_queue in List.iter (function | Message (_, Some u) -> Lwt.wakeup_later u (Error [ Closed ]) | _ -> ()) messages ; Lwt_pipe.close pv.message_queue let already_handled pv oph = Operation_hash.Map.mem oph pv.refused || Operation_hash.Map.mem oph pv.pending || Operation_hash.Set.mem oph pv.fetching || Operation_hash.Set.mem oph pv.live_operations || Operation_hash.Set.mem oph pv.in_mempool let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = { Mempool.known_valid = fst (List.split r.applied) ; pending = Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) r.branch_delayed @@ Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) r.branch_refused @@ Operation_hash.Set.empty } let merge_validation_results ~old ~neu = let open Preapply_result in let merge _key a b = match a, b with | None, None -> None | Some x, None -> Some x | _, Some y -> Some y in let filter_out s m = List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in { applied = List.rev_append neu.applied old.applied ; refused = Operation_hash.Map.empty ; branch_refused = Operation_hash.Map.merge merge (* filtering should not be required if the protocol is sound *) (filter_out neu.applied old.branch_refused) neu.branch_refused ; branch_delayed = Operation_hash.Map.merge merge (filter_out neu.applied old.branch_delayed) neu.branch_delayed } let handle_unprocessed pv = begin match pv.validation_state with | Error err -> pv.validation_result <- { Preapply_result.empty with branch_delayed = Operation_hash.Map.fold (fun h op m -> Operation_hash.Map.add h (op, err) m) pv.pending Operation_hash.Map.empty } ; pv.pending <- Operation_hash.Map.empty ; Lwt.return () | Ok validation_state -> if Operation_hash.Map.is_empty pv.pending then Lwt.return () else begin match Operation_hash.Map.cardinal pv.pending with | 0 -> Lwt.return () | n -> lwt_debug "processing %d operations" n end >>= fun () -> Prevalidation.prevalidate validation_state ~sort:true (Operation_hash.Map.bindings pv.pending) >>= fun (validation_state, validation_result) -> pv.validation_state <- Ok validation_state ; pv.in_mempool <- (Operation_hash.Map.fold (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) pv.pending @@ Operation_hash.Map.fold (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) pv.validation_result.refused @@ pv.in_mempool) ; pv.refused <- (* TODO: cleanup *) (let now = Time.now () in Operation_hash.Map.fold (fun h (_, errs) refused -> Operation_hash.Map.add h (now, errs) refused) pv.validation_result.refused pv.refused) ; Operation_hash.Map.iter (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph) pv.validation_result.refused ; pv.validation_result <- merge_validation_results ~old:pv.validation_result ~neu:validation_result ; pv.pending <- Operation_hash.Map.empty ; Distributed_db.Advertise.current_head pv.net_db ~mempool: (mempool_of_prevalidation_result validation_result) pv.predecessor ; Lwt.return () end >>= fun () -> pv.mempool <- { Mempool.known_valid = fst (List.split pv.validation_result.applied) ; pending = Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) pv.validation_result.branch_delayed @@ Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) pv.validation_result.branch_refused @@ Operation_hash.Set.empty } ; Mempool.set (Distributed_db.net_state pv.net_db) ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> Lwt.return () let fetch_operation pv ?peer oph = debug "fetching operation %a" Operation_hash.pp_short oph ; Distributed_db.Operation.fetch ~timeout:pv.operation_timeout pv.net_db ?peer oph () >>= function | Ok op -> push_request pv (Arrived (oph, op)) ; Lwt.return_unit | Error [ Distributed_db.Operation.Canceled _ ] -> lwt_debug "operation %a included before being prevalidated" Operation_hash.pp_short oph >>= fun () -> Lwt.return_unit | Error _ -> (* should not happen *) Lwt.return_unit let clear_fetching pv = Operation_hash.Set.iter (Distributed_db.Operation.clear_or_cancel pv.net_db) pv.fetching let on_operation_arrived pv oph op = debug "operation %a retrieved" Operation_hash.pp_short oph ; pv.fetching <- Operation_hash.Set.remove oph pv.fetching ; if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin Distributed_db.Operation.clear_or_cancel pv.net_db oph (* TODO: put in a specific delayed map ? *) end else if not (already_handled pv oph) (* prevent double inclusion on flush *) then begin pv.pending <- Operation_hash.Map.add oph op pv.pending end let on_inject pv op = let oph = Operation.hash op in log_notice "injection of operation %a" Operation_hash.pp_short oph ; begin begin if already_handled pv oph then return pv.validation_result else Lwt.return pv.validation_state >>=? fun validation_state -> Prevalidation.prevalidate validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> match result.applied with | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> Distributed_db.inject_operation pv.net_db oph op >>= fun (_ : bool) -> pv.pending <- Operation_hash.Map.add oph op pv.pending ; return result | _ -> return result end >>=? fun result -> if List.mem_assoc oph result.applied then return () else let try_in_map map or_else = try Lwt.return (Error (snd (Operation_hash.Map.find oph map))) with Not_found -> or_else () in try_in_map pv.refused @@ fun () -> try_in_map result.refused @@ fun () -> try_in_map result.branch_refused @@ fun () -> try_in_map result.branch_delayed @@ fun () -> if Operation_hash.Set.mem oph pv.live_operations then failwith "Injected operation %a included in a previous block." Operation_hash.pp oph else failwith "Injected operation %a is not in prevalidation result." Operation_hash.pp oph end >>= fun tzresult -> return tzresult let on_notify pv peer mempool = let all_ophs = List.fold_left (fun s oph -> Operation_hash.Set.add oph s) mempool.Mempool.pending mempool.known_valid in let to_fetch = Operation_hash.Set.filter (fun oph -> not (already_handled pv oph)) all_ophs in debug "notification of %d new operations" (Operation_hash.Set.cardinal to_fetch) ; pv.fetching <- Operation_hash.Set.union to_fetch pv.fetching ; Operation_hash.Set.iter (fun oph -> Lwt.ignore_result (fetch_operation ~peer pv oph)) to_fetch let on_flush pv predecessor = list_pendings ~maintain_net_db:pv.net_db ~from_block:pv.predecessor ~to_block:predecessor (Preapply_result.operations pv.validation_result) >>= fun pending -> let timestamp = Time.now () in Chain_traversal.live_blocks predecessor (State.Block.max_operations_ttl predecessor) >>= fun (new_live_blocks, new_live_operations) -> Prevalidation.start_prevalidation ~predecessor ~timestamp () >>= fun validation_state -> begin match validation_state with | Error _ -> Lwt.return (validation_state, Preapply_result.empty) | Ok validation_state -> Prevalidation.prevalidate validation_state ~sort:false [] >>= fun (state, result) -> Lwt.return (Ok state, result) end >>= fun (validation_state, validation_result) -> lwt_log_notice "flushing the mempool for new head %a (%d operations)" Block_hash.pp_short (State.Block.hash predecessor) (Operation_hash.Map.cardinal pending) >>= fun () -> pv.predecessor <- predecessor ; pv.live_blocks <- new_live_blocks ; pv.live_operations <- new_live_operations ; pv.timestamp <- timestamp ; pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty }; pv.pending <- pending ; pv.in_mempool <- Operation_hash.Set.empty ; pv.validation_result <- validation_result ; pv.validation_state <- validation_state ; return () let rec worker_loop pv = begin handle_unprocessed pv >>= fun () -> Lwt_utils.protect ~canceler:pv.canceler begin fun () -> Lwt_pipe.pop pv.message_queue >>= return end >>=? fun (Message (message, u)) -> wakeup_with_result message u @@ function | Flush block -> on_flush pv block >>=? fun () -> return () | Notify (peer, mempool) -> on_notify pv peer mempool ; return () | Inject op -> on_inject pv op | Arrived (oph, op) -> on_operation_arrived pv oph op ; return () end >>= function | Ok () -> worker_loop pv | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> close_queue pv ; clear_fetching pv ; Lwt.return_unit | Error err -> lwt_log_error "@[Unexpected error:@ %a@]" pp_print_error err >>= fun () -> close_queue pv ; clear_fetching pv ; Lwt_canceler.cancel pv.canceler >>= fun () -> Lwt.return_unit let create ~max_operations ~operation_timeout net_db = let net_state = Distributed_db.net_state net_db in let canceler = Lwt_canceler.create () in let message_queue = Lwt_pipe.create () in State.read_chain_store net_state (fun _ { current_head ; current_mempool ; live_blocks ; live_operations } -> Lwt.return (current_head, current_mempool, live_blocks, live_operations)) >>= fun (predecessor, mempool, live_blocks, live_operations) -> let timestamp = Time.now () in Prevalidation.start_prevalidation ~predecessor ~timestamp () >>= fun validation_state -> begin match validation_state with | Error _ -> Lwt.return (validation_state, Preapply_result.empty) | Ok validation_state -> Prevalidation.prevalidate validation_state ~sort:false [] >>= fun (validation_state, validation_result) -> Lwt.return (Ok validation_state, validation_result) end >>= fun (validation_state, validation_result) -> let fetching = List.fold_left (fun s h -> Operation_hash.Set.add h s) Operation_hash.Set.empty mempool.known_valid in let pv = { operation_timeout ; max_operations ; net_db ; canceler ; worker = Lwt.return_unit ; message_queue ; predecessor ; timestamp ; live_blocks ; live_operations ; mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; refused = Operation_hash.Map.empty ; fetching ; pending = Operation_hash.Map.empty ; in_mempool = Operation_hash.Set.empty ; validation_result ; validation_state } in List.iter (fun oph -> Lwt.ignore_result (fetch_operation pv oph)) mempool.known_valid ; pv.worker <- Lwt_utils.worker (Format.asprintf "net_prevalidator.%a" Net_id.pp (State.Net.id net_state)) ~run:(fun () -> worker_loop pv) ~cancel:(fun () -> Lwt_canceler.cancel pv.canceler) ; Lwt.return pv let shutdown pv = lwt_debug "shutdown" >>= fun () -> Lwt_canceler.cancel pv.canceler >>= fun () -> pv.worker let flush pv head = push_request pv (Flush head) let notify_operations pv peer mempool = push_request pv (Notify (peer, mempool)) let operations pv = { pv.validation_result with applied = List.rev pv.validation_result.applied }, pv.pending let pending ?block pv = let ops = Preapply_result.operations pv.validation_result in match block with | Some to_block -> list_pendings ~maintain_net_db:pv.net_db ~from_block:pv.predecessor ~to_block ops | None -> Lwt.return ops let timestamp pv = pv.timestamp let context pv = Lwt.return pv.validation_state >>=? fun validation_state -> Prevalidation.end_prevalidation validation_state let inject_operation pv op = push_request_and_wait pv (Inject op) >>=? fun result -> Lwt.return result