Shell: no duplicates in Prevalidation

This commit is contained in:
Grégoire Henry 2017-03-04 09:53:44 +01:00
parent c396cd4dec
commit 8453a69e0b
3 changed files with 34 additions and 9 deletions

View File

@ -74,7 +74,7 @@ exception Invalid_operation of Operation_hash.t
type t = { type t = {
net_db: Distributed_db.net ; net_db: Distributed_db.net ;
flush: State.Valid_block.t -> unit; flush: State.Valid_block.t -> unit;
notify_operation: P2p.Peer_id.t -> Operation_hash.t -> unit ; notify_operations: P2p.Peer_id.t -> Operation_hash.t list -> unit ;
prevalidate_operations: prevalidate_operations:
bool -> Store.Operation.t list -> bool -> Store.Operation.t list ->
(Operation_hash.t list * error Updater.preapply_result) tzresult Lwt.t ; (Operation_hash.t list * error Updater.preapply_result) tzresult Lwt.t ;
@ -111,6 +111,8 @@ let create net_db =
| Error _ -> ref head.context | Error _ -> ref head.context
| Ok (ctxt, _) -> ref ctxt | Ok (ctxt, _) -> ref ctxt
end >>= fun context -> end >>= fun context ->
let pending = Operation_hash.Table.create 53 in
let protocol = ref protocol in let protocol = ref protocol in
let head = ref head in let head = ref head in
let operations = ref Updater.empty_result in let operations = ref Updater.empty_result in
@ -263,10 +265,34 @@ let create net_db =
Lwt.wakeup w result ; Lwt.wakeup w result ;
Lwt.return_unit Lwt.return_unit
end end
| `Register op -> | `Register (gid, ops) ->
Lwt_list.filter_p
(fun op ->
Distributed_db.Operation.known net_db op >|= not)
ops >>= fun new_ops ->
let known_ops, unknown_ops =
List.partition
(fun op -> Operation_hash.Table.mem pending op) new_ops in
let fetch op =
Distributed_db.Operation.fetch
net_db ~peer:gid op >>= fun _op ->
push_to_worker (`Handle op) ;
Lwt.return_unit
in
List.iter
(fun op -> Operation_hash.Table.add pending op (fetch op))
unknown_ops ;
List.iter (fun op ->
Lwt.ignore_result
(Distributed_db.Operation.fetch net_db ~peer:gid op))
known_ops ;
Lwt.return_unit
| `Handle op ->
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
Operation_hash.Table.remove pending op ;
broadcast_unprocessed := true ; broadcast_unprocessed := true ;
unprocessed := Operation_hash.Set.singleton op ; unprocessed := Operation_hash.Set.singleton op ;
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
Lwt.return_unit Lwt.return_unit
| `Flush (new_head : State.Valid_block.t) -> | `Flush (new_head : State.Valid_block.t) ->
let new_protocol = let new_protocol =
@ -305,10 +331,9 @@ let create net_db =
if not (Lwt.is_sleeping !running_validation) then if not (Lwt.is_sleeping !running_validation) then
Lwt.cancel !running_validation Lwt.cancel !running_validation
in in
let notify_operation gid op = let notify_operations gid ops =
Lwt.async begin fun () -> Lwt.async begin fun () ->
Distributed_db.Operation.fetch net_db ~peer:gid op >>= fun _ -> push_to_worker (`Register (gid, ops)) ;
push_to_worker (`Register op) ;
Lwt.return_unit Lwt.return_unit
end in end in
let prevalidate_operations force raw_ops = let prevalidate_operations force raw_ops =
@ -339,7 +364,7 @@ let create net_db =
Lwt.return { Lwt.return {
net_db ; net_db ;
flush ; flush ;
notify_operation ; notify_operations ;
prevalidate_operations ; prevalidate_operations ;
operations = operations =
(fun () -> (fun () ->
@ -353,7 +378,7 @@ let create net_db =
} }
let flush pv head = pv.flush head let flush pv head = pv.flush head
let notify_operation pv = pv.notify_operation let notify_operations pv = pv.notify_operations
let prevalidate_operations pv = pv.prevalidate_operations let prevalidate_operations pv = pv.prevalidate_operations
let operations pv = pv.operations () let operations pv = pv.operations ()
let pending ?block pv = pv.pending ?block () let pending ?block pv = pv.pending ?block ()

View File

@ -32,7 +32,7 @@ type t
val create: Distributed_db.net -> t Lwt.t val create: Distributed_db.net -> t Lwt.t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
val notify_operation: t -> P2p.Peer_id.t -> Operation_hash.t -> unit val notify_operations: t -> P2p.Peer_id.t -> Operation_hash.t list -> unit
(** Conditionnaly inject a new operation in the node: the operation will (** Conditionnaly inject a new operation in the node: the operation will
be ignored when it is (strongly) refused This is the be ignored when it is (strongly) refused This is the

View File

@ -580,7 +580,7 @@ let rec create_validator ?parent worker state db net =
loop () loop ()
| `Head (gid, head, ops) -> | `Head (gid, head, ops) ->
Context_db.prefetch v session head ; Context_db.prefetch v session head ;
List.iter (Prevalidator.notify_operation prevalidator gid) ops ; Prevalidator.notify_operations prevalidator gid ops ;
loop () loop ()
in in
Lwt.catch loop Lwt.catch loop