Mempool_peer_worker: simpler interface
Removes unecessary primitives, types, etc.
This commit is contained in:
parent
1431bcd6c8
commit
6b7031ad3c
@ -31,31 +31,21 @@ type limits = {
|
||||
}
|
||||
|
||||
module type T = sig
|
||||
module Proto: Registered_protocol.T
|
||||
module Mempool_worker: Mempool_worker.T with module Proto = Proto
|
||||
module Mempool_worker: Mempool_worker.T
|
||||
|
||||
type t
|
||||
type input = Operation_hash.t list
|
||||
type result =
|
||||
| Cannot_download of error list
|
||||
| Cannot_parse of error list
|
||||
| Cannot_validate of error list
|
||||
| Mempool_result of Mempool_worker.result
|
||||
type output = result Operation_hash.Map.t
|
||||
|
||||
val create: limits -> P2p_peer.Id.t -> t Lwt.t
|
||||
val shutdown: t -> unit Lwt.t
|
||||
val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t
|
||||
val shutdown: t -> input Lwt.t
|
||||
|
||||
val validate: Mempool_worker.t -> t -> input -> output tzresult Lwt.t
|
||||
|
||||
(* For special use when a bunch of operations is not attributed to a specific
|
||||
* peer worker. E.g., for injecting operations, for recylcing known operations
|
||||
* from a previous protocol, etc. *)
|
||||
val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t
|
||||
val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t
|
||||
|
||||
end
|
||||
|
||||
module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_worker.Proto = struct
|
||||
module Make (Mempool_worker: Mempool_worker.T)
|
||||
: T with module Mempool_worker = Mempool_worker
|
||||
= struct
|
||||
|
||||
(* 0. Prelude: set up base modules and types *)
|
||||
(* See interface file for info if needed. *)
|
||||
@ -295,10 +285,10 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
|
||||
end
|
||||
|
||||
module Types = struct
|
||||
type parameters = unit
|
||||
type state = unit
|
||||
type parameters = Mempool_worker.t
|
||||
type state = { mempool_worker: Mempool_worker.t }
|
||||
type view = unit
|
||||
let view () () = ()
|
||||
let view _ _ = ()
|
||||
let encoding = Data_encoding.unit
|
||||
let pp _ _ = ()
|
||||
end
|
||||
@ -314,7 +304,8 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
|
||||
|
||||
type self = t
|
||||
|
||||
let on_launch _ _ () = Lwt.return_unit
|
||||
let on_launch _ _ mempool_worker =
|
||||
Lwt.return Types.{ mempool_worker }
|
||||
|
||||
let on_request : type a. self -> a Request.t -> a tzresult Lwt.t
|
||||
= fun t (Request.Batch (mempool_worker, os)) ->
|
||||
@ -342,17 +333,40 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
|
||||
(* 4. Public interface: exporting a thin wrapper around workers and work. *)
|
||||
(* See interface file for documentation *)
|
||||
|
||||
let create limits peer_id =
|
||||
Worker.launch table limits.worker_limits peer_id () (module Handlers)
|
||||
let shutdown w = Worker.shutdown w
|
||||
|
||||
let validate mempool_worker t os =
|
||||
Worker.push_request_and_wait t (Request.Batch (mempool_worker, os))
|
||||
>>=? fun (_: output) -> return_unit
|
||||
|
||||
let bypass_peer_workers mempool_worker input =
|
||||
Log.lwt_log_info "Bypassing workers to work on: %a" pp_input input >>= fun () ->
|
||||
Work.work mempool_worker input >>= fun output ->
|
||||
Log.lwt_log_info "Finished work" >>= fun () ->
|
||||
Lwt.return output
|
||||
let create limits peer_id mempool_worker input =
|
||||
Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers) >>= fun w ->
|
||||
validate mempool_worker w input >>= fun (_: unit tzresult) ->
|
||||
(* NOTE: We ignore errors here. The validation of the [input] is only for
|
||||
* recycling purposes and is not essential. *)
|
||||
Lwt.return w
|
||||
|
||||
let shutdown w =
|
||||
let recycled = Operation_hash.Set.empty in
|
||||
let recycled =
|
||||
List.fold_left
|
||||
(fun recycled (_, input) ->
|
||||
List.fold_left
|
||||
(fun recycled op_h -> Operation_hash.Set.add op_h recycled)
|
||||
recycled
|
||||
input)
|
||||
recycled
|
||||
(Worker.pending_requests w)
|
||||
in
|
||||
let recycled =
|
||||
match Worker.current_request w with
|
||||
| Some (_, _, input) ->
|
||||
List.fold_left
|
||||
(fun recycled op_h -> Operation_hash.Set.add op_h recycled)
|
||||
recycled
|
||||
input
|
||||
| None -> recycled
|
||||
in
|
||||
let input = Operation_hash.Set.elements recycled in
|
||||
Worker.shutdown w >>= fun () ->
|
||||
Lwt.return input
|
||||
|
||||
end
|
||||
|
@ -30,8 +30,7 @@ type limits = {
|
||||
}
|
||||
|
||||
module type T = sig
|
||||
module Proto: Registered_protocol.T
|
||||
module Mempool_worker: Mempool_worker.T with module Proto = Proto
|
||||
module Mempool_worker: Mempool_worker.T
|
||||
|
||||
(** The type of a peer worker. Each peer worker should be used for treating
|
||||
* all the operations from a given peer. *)
|
||||
@ -44,45 +43,25 @@ module type T = sig
|
||||
* their validity before gossiping them furhter. *)
|
||||
type input = Operation_hash.t list
|
||||
|
||||
(** [result] are the different possible outcome of the validation of a single
|
||||
* operation. It signals either errors in the validation process, or results
|
||||
* from further down the validation system. *)
|
||||
type result =
|
||||
| Cannot_download of error list
|
||||
| Cannot_parse of error list
|
||||
| Cannot_validate of error list
|
||||
| Mempool_result of Mempool_worker.result
|
||||
(** [create limits peer_id mempool_worker input] creates a peer worker meant
|
||||
* to be used for validating batches of operations sent by the peer [peer_id].
|
||||
* The [mempool_worker] the underlying worker that individual validations of
|
||||
* singular operations are delegated to. The [input[] argument is for recycled
|
||||
* operations that are carried over when the protocol updates. *)
|
||||
val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t
|
||||
|
||||
(** [output] are the outcome of the validation of a batch of operations. *)
|
||||
type output = result Operation_hash.Map.t
|
||||
|
||||
(** [create limits peer_id] creates a peer worker meant to be used for
|
||||
* validating operations sent by the peer [peer_id]. *)
|
||||
val create: limits -> P2p_peer.Id.t -> t Lwt.t
|
||||
|
||||
(** [shutdown t] closes the peer worker [t]. *)
|
||||
val shutdown: t -> unit Lwt.t
|
||||
(** [shutdown t] closes the peer worker [t]. It returns a list of operation
|
||||
* hashes that can be recycled when a new worker is created for the same peer.
|
||||
* *)
|
||||
val shutdown: t -> input Lwt.t
|
||||
|
||||
(** [validate mempool_worker worker input] validates the batch of operations
|
||||
* [input]. The work is performed by [worker] and the underlying validation of
|
||||
* each operation is performed by [mempool_worker]. *)
|
||||
val validate: Mempool_worker.t -> t -> input -> output tzresult Lwt.t
|
||||
|
||||
(** [bypass_peer_workers mempool_worker input] validates the batch of
|
||||
* operations [input]. Unlike [validate] above, the work is not performed by a
|
||||
* specific worker.
|
||||
*
|
||||
* This is intended to be used for cases where the [input] cannot be
|
||||
* attributed to a specific peer. Typically, this happens when injecting an
|
||||
* operation from the local client, or when recycling pending operations after
|
||||
* a protocol change.
|
||||
*
|
||||
* Note that, unlike [validate], this bypasses the worker mechanics entirely.
|
||||
* As a result, there is no possible introspection and the work from spearate
|
||||
* calls to [bypass_peer_workers] is not sequentialised. *)
|
||||
val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t
|
||||
val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t
|
||||
|
||||
end
|
||||
|
||||
|
||||
module Make (Mempool_worker : Mempool_worker.T) : T with module Proto = Mempool_worker.Proto
|
||||
module Make (Mempool_worker : Mempool_worker.T)
|
||||
: T with module Mempool_worker = Mempool_worker
|
||||
|
Loading…
Reference in New Issue
Block a user