Node: throttle advertisement of newly discovered operations a little bit

This commit is contained in:
Benjamin Canou 2017-11-29 17:39:43 +01:00 committed by Grégoire Henry
parent 755d63c0ef
commit cd94b998c0

View File

@ -53,6 +53,7 @@ type 'a request =
| Notify : P2p.Peer_id.t * Mempool.t -> unit request
| Inject : Operation.t -> unit tzresult request
| Arrived : Operation_hash.t * Operation.t -> unit request
| Advertise : unit request
type message = Message: 'a request * 'a tzresult Lwt.u option -> message
@ -98,6 +99,7 @@ type t = {
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.prevalidation_state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ;
}
type error += Closed
@ -163,6 +165,20 @@ let merge_validation_results ~old ~neu =
(filter_out neu.applied old.branch_delayed)
neu.branch_delayed }
let advertise pv mempool =
match pv.advertisement with
| `Pending { Mempool.known_valid ; pending } ->
pv.advertisement <-
`Pending
{ known_valid = known_valid @ mempool.Mempool.known_valid ;
pending = Operation_hash.Set.union pending mempool.pending }
| `None ->
pv.advertisement <- `Pending mempool ;
Lwt.async (fun () ->
Lwt_unix.sleep 0.01 >>= fun () ->
push_request pv Advertise ;
Lwt.return_unit)
let handle_unprocessed pv =
begin match pv.validation_state with
| Error err ->
@ -212,10 +228,8 @@ let handle_unprocessed pv =
~neu:validation_result ;
pv.pending <-
Operation_hash.Map.empty ;
Distributed_db.Advertise.current_head
pv.net_db
~mempool: (mempool_of_prevalidation_result validation_result)
pv.predecessor ;
advertise pv
(mempool_of_prevalidation_result validation_result) ;
Lwt.return ()
end >>= fun () ->
pv.mempool <-
@ -353,6 +367,13 @@ let on_flush pv predecessor =
pv.validation_state <- validation_state ;
return ()
let on_advertise pv =
match pv.advertisement with
| `None -> () (* should not happen *)
| `Pending mempool ->
pv.advertisement <- `None ;
Distributed_db.Advertise.current_head pv.net_db ~mempool pv.predecessor
let rec worker_loop pv =
begin
handle_unprocessed pv >>= fun () ->
@ -361,6 +382,8 @@ let rec worker_loop pv =
end >>=? fun (Message (message, u)) ->
wakeup_with_result message u @@ function
| Flush block ->
on_advertise pv ;
(* TODO: rebase the advertisement instead *)
on_flush pv block >>=? fun () ->
return ()
| Notify (peer, mempool) ->
@ -371,12 +394,15 @@ let rec worker_loop pv =
| Arrived (oph, op) ->
on_operation_arrived pv oph op ;
return ()
| Advertise ->
on_advertise pv ;
return ()
end >>= function
| Ok () ->
worker_loop pv
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
close_queue pv ;
clear_fetching pv ;
close_queue pv ;
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error:@ %a@]"
@ -419,7 +445,8 @@ let create limits net_db =
fetching ;
pending = Operation_hash.Map.empty ;
in_mempool = Operation_hash.Set.empty ;
validation_result ; validation_state } in
validation_result ; validation_state ;
advertisement = `None } in
List.iter
(fun oph -> Lwt.ignore_result (fetch_operation pv oph))
mempool.known_valid ;