diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index d369b490f..50a9d2833 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -74,7 +74,7 @@ exception Invalid_operation of Operation_hash.t type t = { net_db: Distributed_db.net ; flush: State.Valid_block.t -> unit; - notify_operation: P2p.Peer_id.t -> Operation_hash.t -> unit ; + notify_operations: P2p.Peer_id.t -> Operation_hash.t list -> unit ; prevalidate_operations: bool -> Store.Operation.t list -> (Operation_hash.t list * error Updater.preapply_result) tzresult Lwt.t ; @@ -111,6 +111,8 @@ let create net_db = | Error _ -> ref head.context | Ok (ctxt, _) -> ref ctxt end >>= fun context -> + + let pending = Operation_hash.Table.create 53 in let protocol = ref protocol in let head = ref head in let operations = ref Updater.empty_result in @@ -263,10 +265,34 @@ let create net_db = Lwt.wakeup w result ; Lwt.return_unit end - | `Register op -> + | `Register (gid, ops) -> + Lwt_list.filter_p + (fun op -> + Distributed_db.Operation.known net_db op >|= not) + ops >>= fun new_ops -> + let known_ops, unknown_ops = + List.partition + (fun op -> Operation_hash.Table.mem pending op) new_ops in + let fetch op = + Distributed_db.Operation.fetch + net_db ~peer:gid op >>= fun _op -> + push_to_worker (`Handle op) ; + Lwt.return_unit + in + List.iter + (fun op -> Operation_hash.Table.add pending op (fetch op)) + unknown_ops ; + List.iter (fun op -> + Lwt.ignore_result + (Distributed_db.Operation.fetch net_db ~peer:gid op)) + known_ops ; + Lwt.return_unit + | `Handle op -> lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> + Operation_hash.Table.remove pending op ; broadcast_unprocessed := true ; unprocessed := Operation_hash.Set.singleton op ; + lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> Lwt.return_unit | `Flush (new_head : State.Valid_block.t) -> let new_protocol = @@ -305,10 +331,9 @@ let create net_db = if not (Lwt.is_sleeping !running_validation) then Lwt.cancel !running_validation in - let notify_operation gid op = + let notify_operations gid ops = Lwt.async begin fun () -> - Distributed_db.Operation.fetch net_db ~peer:gid op >>= fun _ -> - push_to_worker (`Register op) ; + push_to_worker (`Register (gid, ops)) ; Lwt.return_unit end in let prevalidate_operations force raw_ops = @@ -339,7 +364,7 @@ let create net_db = Lwt.return { net_db ; flush ; - notify_operation ; + notify_operations ; prevalidate_operations ; operations = (fun () -> @@ -353,7 +378,7 @@ let create net_db = } let flush pv head = pv.flush head -let notify_operation pv = pv.notify_operation +let notify_operations pv = pv.notify_operations let prevalidate_operations pv = pv.prevalidate_operations let operations pv = pv.operations () let pending ?block pv = pv.pending ?block () diff --git a/src/node/shell/prevalidator.mli b/src/node/shell/prevalidator.mli index 3713cec81..7e17fd7d2 100644 --- a/src/node/shell/prevalidator.mli +++ b/src/node/shell/prevalidator.mli @@ -32,7 +32,7 @@ type t val create: Distributed_db.net -> t Lwt.t val shutdown: t -> unit Lwt.t -val notify_operation: t -> P2p.Peer_id.t -> Operation_hash.t -> unit +val notify_operations: t -> P2p.Peer_id.t -> Operation_hash.t list -> unit (** Conditionnaly inject a new operation in the node: the operation will be ignored when it is (strongly) refused This is the diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index a7f0edd81..cf23f2b27 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -580,7 +580,7 @@ let rec create_validator ?parent worker state db net = loop () | `Head (gid, head, ops) -> Context_db.prefetch v session head ; - List.iter (Prevalidator.notify_operation prevalidator gid) ops ; + Prevalidator.notify_operations prevalidator gid ops ; loop () in Lwt.catch loop