diff --git a/src/lib_node_shell/prevalidator.ml b/src/lib_node_shell/prevalidator.ml index 45fd4c2f1..a3094eca4 100644 --- a/src/lib_node_shell/prevalidator.ml +++ b/src/lib_node_shell/prevalidator.ml @@ -53,6 +53,7 @@ type 'a request = | Notify : P2p.Peer_id.t * Mempool.t -> unit request | Inject : Operation.t -> unit tzresult request | Arrived : Operation_hash.t * Operation.t -> unit request + | Advertise : unit request type message = Message: 'a request * 'a tzresult Lwt.u option -> message @@ -98,6 +99,7 @@ type t = { mutable in_mempool : Operation_hash.Set.t ; mutable validation_result : error Preapply_result.t ; mutable validation_state : Prevalidation.prevalidation_state tzresult ; + mutable advertisement : [ `Pending of Mempool.t | `None ] ; } type error += Closed @@ -163,6 +165,20 @@ let merge_validation_results ~old ~neu = (filter_out neu.applied old.branch_delayed) neu.branch_delayed } +let advertise pv mempool = + match pv.advertisement with + | `Pending { Mempool.known_valid ; pending } -> + pv.advertisement <- + `Pending + { known_valid = known_valid @ mempool.Mempool.known_valid ; + pending = Operation_hash.Set.union pending mempool.pending } + | `None -> + pv.advertisement <- `Pending mempool ; + Lwt.async (fun () -> + Lwt_unix.sleep 0.01 >>= fun () -> + push_request pv Advertise ; + Lwt.return_unit) + let handle_unprocessed pv = begin match pv.validation_state with | Error err -> @@ -212,10 +228,8 @@ let handle_unprocessed pv = ~neu:validation_result ; pv.pending <- Operation_hash.Map.empty ; - Distributed_db.Advertise.current_head - pv.net_db - ~mempool: (mempool_of_prevalidation_result validation_result) - pv.predecessor ; + advertise pv + (mempool_of_prevalidation_result validation_result) ; Lwt.return () end >>= fun () -> pv.mempool <- @@ -353,6 +367,13 @@ let on_flush pv predecessor = pv.validation_state <- validation_state ; return () +let on_advertise pv = + match pv.advertisement with + | `None -> () (* should not happen *) + | `Pending mempool -> + pv.advertisement <- `None ; + Distributed_db.Advertise.current_head pv.net_db ~mempool pv.predecessor + let rec worker_loop pv = begin handle_unprocessed pv >>= fun () -> @@ -361,6 +382,8 @@ let rec worker_loop pv = end >>=? fun (Message (message, u)) -> wakeup_with_result message u @@ function | Flush block -> + on_advertise pv ; + (* TODO: rebase the advertisement instead *) on_flush pv block >>=? fun () -> return () | Notify (peer, mempool) -> @@ -371,12 +394,15 @@ let rec worker_loop pv = | Arrived (oph, op) -> on_operation_arrived pv oph op ; return () + | Advertise -> + on_advertise pv ; + return () end >>= function | Ok () -> worker_loop pv | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> - close_queue pv ; clear_fetching pv ; + close_queue pv ; Lwt.return_unit | Error err -> lwt_log_error "@[Unexpected error:@ %a@]" @@ -419,7 +445,8 @@ let create limits net_db = fetching ; pending = Operation_hash.Map.empty ; in_mempool = Operation_hash.Set.empty ; - validation_result ; validation_state } in + validation_result ; validation_state ; + advertisement = `None } in List.iter (fun oph -> Lwt.ignore_result (fetch_operation pv oph)) mempool.known_valid ;