Node: switch the prevalidator to Tezos_worker

This commit is contained in:
Benjamin Canou 2017-11-30 18:34:22 +01:00 committed by Grégoire Henry
parent d50402c27b
commit 98ec3393b6
25 changed files with 638 additions and 360 deletions

View File

@ -8,6 +8,7 @@
tezos-node-updater tezos-node-updater
tezos-node-p2p-base tezos-node-p2p-base
tezos-node-p2p tezos-node-p2p
tezos-node-shell-base
tezos-node-shell tezos-node-shell
tezos-embedded-protocol-genesis tezos-embedded-protocol-genesis
tezos-embedded-protocol-demo tezos-embedded-protocol-demo
@ -21,6 +22,7 @@
-open Tezos_node_updater -open Tezos_node_updater
-open Tezos_node_p2p_base -open Tezos_node_p2p_base
-open Tezos_node_p2p -open Tezos_node_p2p
-open Tezos_node_shell_base
-open Tezos_node_shell -open Tezos_node_shell
-linkall)))) -linkall))))

View File

@ -108,6 +108,12 @@ let default_shell = {
prevalidator_limits = { prevalidator_limits = {
operation_timeout = 10. ; operation_timeout = 10. ;
max_refused_operations = 1000 ; max_refused_operations = 1000 ;
worker_limits = {
backlog_size = 1000 ;
backlog_level = Logging.Info ;
zombie_lifetime = 600. ;
zombie_memory = 120. ;
}
} ; } ;
timeout = { timeout = {
block_header = 60. ; block_header = 60. ;
@ -257,19 +263,45 @@ let log =
(opt "rules" string) (opt "rules" string)
(dft "template" string default_log.template)) (dft "template" string default_log.template))
let worker_limits_encoding
default_size
default_level
default_zombie_lifetime
default_zombie_memory =
let open Data_encoding in
conv
(fun { Worker_types.backlog_size ; backlog_level ; zombie_lifetime ; zombie_memory } ->
(backlog_size, backlog_level, zombie_lifetime, zombie_memory))
(fun (backlog_size, backlog_level, zombie_lifetime, zombie_memory) ->
{ backlog_size ; backlog_level ; zombie_lifetime ; zombie_memory })
(obj4
(dft "worker_backlog_size" uint16 default_size)
(dft "worker_backlog_level" Logging.level_encoding default_level)
(dft "worker_zombie_lifetime" float default_zombie_lifetime)
(dft "worker_zombie_memory" float default_zombie_memory))
let timeout_encoding =
Data_encoding.ranged_float 0. 500.
let prevalidator_limits_encoding = let prevalidator_limits_encoding =
let open Data_encoding in let open Data_encoding in
let uint8 = conv int_of_float float_of_int uint8 in
conv conv
(fun { Node.operation_timeout ; max_refused_operations } -> (fun { Node.operation_timeout ; max_refused_operations ; worker_limits } ->
(operation_timeout, max_refused_operations)) ((operation_timeout, max_refused_operations), worker_limits))
(fun (operation_timeout, max_refused_operations) -> (fun ((operation_timeout, max_refused_operations), worker_limits) ->
{ operation_timeout ; max_refused_operations }) { operation_timeout ; max_refused_operations ; worker_limits})
(obj2 (merge_objs
(dft "operations_timeout" uint8 (obj2
default_shell.prevalidator_limits.operation_timeout) (dft "operations_request_timeout" timeout_encoding
(dft "max_refused_operations" uint16 default_shell.prevalidator_limits.operation_timeout)
default_shell.prevalidator_limits.max_refused_operations)) (dft "max_refused_operations" uint16
default_shell.prevalidator_limits.max_refused_operations))
(worker_limits_encoding
default_shell.prevalidator_limits.worker_limits.backlog_size
default_shell.prevalidator_limits.worker_limits.backlog_level
default_shell.prevalidator_limits.worker_limits.zombie_lifetime
default_shell.prevalidator_limits.worker_limits.zombie_memory))
let timeout_encoding = let timeout_encoding =
let open Data_encoding in let open Data_encoding in

View File

@ -7,6 +7,7 @@
tezos-storage tezos-storage
tezos-rpc-http tezos-rpc-http
tezos-node-p2p-base tezos-node-p2p-base
tezos-node-shell-base
tezos-node-services tezos-node-services
tezos-node-updater tezos-node-updater
tezos-protocol-compiler)) tezos-protocol-compiler))

View File

@ -4,11 +4,13 @@
((name tezos_node_services) ((name tezos_node_services)
(public_name tezos-node-services) (public_name tezos-node-services)
(libraries (tezos-base (libraries (tezos-base
tezos-node-p2p-base)) tezos-node-p2p-base
tezos-node-shell-base))
(flags (:standard -w -9+27-30-32-40@8 (flags (:standard -w -9+27-30-32-40@8
-safe-string -safe-string
-open Tezos_base__TzPervasives -open Tezos_base__TzPervasives
-open Tezos_node_p2p_base)))) -open Tezos_node_p2p_base
-open Tezos_node_shell_base))))
(alias (alias
((name runtest_indent) ((name runtest_indent)

View File

@ -10,7 +10,7 @@
open Logging.Node.State open Logging.Node.State
open State open State
let mempool_encoding = State.mempool_encoding let mempool_encoding = Mempool.encoding
let genesis net_state = let genesis net_state =
let genesis = Net.genesis net_state in let genesis = Net.genesis net_state in
@ -37,7 +37,7 @@ let mem net_state hash =
type data = State.chain_data = { type data = State.chain_data = {
current_head: Block.t ; current_head: Block.t ;
current_mempool: mempool ; current_mempool: Mempool.t ;
live_blocks: Block_hash.Set.t ; live_blocks: Block_hash.Set.t ;
live_operations: Operation_hash.Set.t ; live_operations: Operation_hash.Set.t ;
locator: Block_locator.t Lwt.t lazy_t ; locator: Block_locator.t Lwt.t lazy_t ;
@ -86,7 +86,7 @@ let locked_set_head net_state chain_store data block =
block (State.Block.max_operations_ttl block) >>= fun (live_blocks, block (State.Block.max_operations_ttl block) >>= fun (live_blocks,
live_operations) -> live_operations) ->
Lwt.return { current_head = block ; Lwt.return { current_head = block ;
current_mempool = State.empty_mempool ; current_mempool = Mempool.empty ;
live_blocks ; live_blocks ;
live_operations ; live_operations ;
locator = lazy (State.compute_locator net_state block) ; locator = lazy (State.compute_locator net_state block) ;

View File

@ -22,7 +22,7 @@ val locator: Net.t -> Block_locator.t Lwt.t
(** All the available chain data. *) (** All the available chain data. *)
type data = { type data = {
current_head: Block.t ; current_head: Block.t ;
current_mempool: mempool ; current_mempool: Mempool.t ;
live_blocks: Block_hash.Set.t ; live_blocks: Block_hash.Set.t ;
live_operations: Operation_hash.Set.t ; live_operations: Operation_hash.Set.t ;
locator: Block_locator.t Lwt.t lazy_t ; locator: Block_locator.t Lwt.t lazy_t ;

View File

@ -491,7 +491,7 @@ module P2p_reader = struct
| Get_current_head net_id -> | Get_current_head net_id ->
may_handle state net_id @@ fun net_db -> may_handle state net_id @@ fun net_db ->
Mempool.get net_db.net_state >>= fun (head, mempool) -> State.Current_mempool.get net_db.net_state >>= fun (head, mempool) ->
(* TODO bound the sent mempool size *) (* TODO bound the sent mempool size *)
ignore ignore
@@ P2p.try_send global_db.p2p state.conn @@ P2p.try_send global_db.p2p state.conn

View File

@ -8,6 +8,7 @@
tezos-rpc-http tezos-rpc-http
tezos-node-services tezos-node-services
tezos-node-p2p-base tezos-node-p2p-base
tezos-node-shell-base
tezos-node-p2p tezos-node-p2p
tezos-node-updater)) tezos-node-updater))
(flags (:standard -w -9+27-30-32-40@8 (flags (:standard -w -9+27-30-32-40@8
@ -17,6 +18,7 @@
-open Tezos_rpc_http -open Tezos_rpc_http
-open Tezos_node_services -open Tezos_node_services
-open Tezos_node_p2p_base -open Tezos_node_p2p_base
-open Tezos_node_shell_base
-open Tezos_node_p2p -open Tezos_node_p2p
-open Tezos_node_updater)))) -open Tezos_node_updater))))

View File

@ -207,7 +207,7 @@ and worker_loop nv =
else begin else begin
Chain.set_head nv.net_state block >>= fun previous -> Chain.set_head nv.net_state block >>= fun previous ->
broadcast_head nv ~previous block >>= fun () -> broadcast_head nv ~previous block >>= fun () ->
Prevalidator.flush nv.prevalidator block ; (* FIXME *) Prevalidator.flush nv.prevalidator block_hash >>=? fun () ->
may_switch_test_network nv block >>= fun () -> may_switch_test_network nv block >>= fun () ->
Lwt_watcher.notify nv.new_head_input block ; Lwt_watcher.notify nv.new_head_input block ;
lwt_log_notice "update current head %a %a %a(%t)" lwt_log_notice "update current head %a %a %a(%t)"

View File

@ -98,7 +98,8 @@ and timeout = Net_validator.timeout = {
and prevalidator_limits = Prevalidator.limits = { and prevalidator_limits = Prevalidator.limits = {
max_refused_operations: int ; max_refused_operations: int ;
operation_timeout: float operation_timeout: float ;
worker_limits : Worker_types.limits ;
} }
let may_create_net state genesis = let may_create_net state genesis =

View File

@ -27,7 +27,8 @@ and timeout = {
} }
and prevalidator_limits = { and prevalidator_limits = {
max_refused_operations: int ; max_refused_operations: int ;
operation_timeout: float operation_timeout: float ;
worker_limits : Worker_types.limits ;
} }
val create: config -> timeout -> prevalidator_limits -> t tzresult Lwt.t val create: config -> timeout -> prevalidator_limits -> t tzresult Lwt.t

View File

@ -7,7 +7,78 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
open Logging.Node.Prevalidator open Prevalidator_worker_state
type limits = {
max_refused_operations : int ;
operation_timeout : float ;
worker_limits : Worker_types.limits ;
}
module Name = struct
type t = Net_id.t
let encoding = Net_id.encoding
let base = [ "prevalidator" ]
let pp = Net_id.pp_short
end
module Types = struct
(* Invariants:
- an operation is in only one of these sets (map domains):
pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool
- pv.in_mempool is the domain of all fields of pv.prevalidation_result
- pv.prevalidation_result.refused = Ø, refused ops are in pv.refused
- the 'applied' operations in pv.validation_result are in reverse order. *)
type state = {
net_db : Distributed_db.net_db ;
limits : limits ;
mutable predecessor : State.Block.t ;
mutable timestamp : Time.t ;
mutable live_blocks : Block_hash.Set.t ; (* just a cache *)
mutable live_operations : Operation_hash.Set.t ; (* just a cache *)
refused : Operation_hash.t Ring.t ;
mutable refusals : error list Operation_hash.Map.t ;
mutable fetching : Operation_hash.Set.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.prevalidation_state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ;
}
type parameters = limits * Distributed_db.net_db
include Worker_state
let view (state : state) _ : view =
let domain map =
Operation_hash.Map.fold
(fun elt _ acc -> Operation_hash.Set.add elt acc)
map Operation_hash.Set.empty in
{ head = State.Block.hash state.predecessor ;
timestamp = state.timestamp ;
fetching = state.fetching ;
pending = domain state.pending ;
applied =
List.rev
(List.map (fun (h, _) -> h)
state.validation_result.applied) ;
delayed =
Operation_hash.Set.union
(domain state.validation_result.branch_delayed)
(domain state.validation_result.branch_refused) }
end
module Worker = Worker.Make (Name) (Event) (Request) (Types)
open Types
type t = Worker.infinite Worker.queue Worker.t
type error += Closed = Worker.Closed
let debug w =
Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool =
let rec pop_blocks ancestor block mempool = let rec pop_blocks ancestor block mempool =
@ -48,103 +119,6 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool =
Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool -> Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool ->
Lwt.return new_mempool Lwt.return new_mempool
type 'a request =
| Flush : State.Block.t -> unit request
| Notify : P2p.Peer_id.t * Mempool.t -> unit request
| Inject : Operation.t -> unit tzresult request
| Arrived : Operation_hash.t * Operation.t -> unit request
| Advertise : unit request
type message = Message: 'a request * 'a tzresult Lwt.u option -> message
let wakeup_with_result
: type t.
t request ->
t tzresult Lwt.u option ->
(t request -> t tzresult Lwt.t) ->
unit tzresult Lwt.t
= fun req u cb -> match u with
| None ->
cb req >>=? fun _res -> return ()
| Some u ->
cb req >>= fun res ->
Lwt.wakeup_later u res ;
Lwt.return (res >>? fun _res -> ok ())
type limits = {
max_refused_operations : int ;
operation_timeout : float
}
(* Invariants:
- an operation is in only one of these sets (map domains):
pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool
- pv.in_mempool is the domain of all fields of pv.prevalidation_result
- pv.prevalidation_result.refused = Ø, refused ops are in pv.refused *)
type t = {
net_db : Distributed_db.net_db ;
limits : limits ;
canceler : Lwt_canceler.t ;
message_queue : message Lwt_pipe.t ;
mutable (* just for init *) worker : unit Lwt.t ;
mutable predecessor : State.Block.t ;
mutable timestamp : Time.t ;
mutable live_blocks : Block_hash.Set.t ; (* just a cache *)
mutable live_operations : Operation_hash.Set.t ; (* just a cache *)
refused : Operation_hash.t Ring.t ;
mutable refusals : error list Operation_hash.Map.t ;
mutable fetching : Operation_hash.Set.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.prevalidation_state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ;
}
type error += Closed of Net_id.t
let () =
register_error_kind `Permanent
~id:"prevalidator.closed"
~title:"Prevalidator closed"
~description:
"An operation on the prevalidator could not complete \
before the prevalidator was shut down."
~pp: (fun ppf net_id ->
Format.fprintf ppf
"Prevalidator for network %a has been shut down."
Net_id.pp_short net_id)
Data_encoding.(obj1 (req "net_id" Net_id.encoding))
(function Closed net_id -> Some net_id | _ -> None)
(fun net_id -> Closed net_id)
let push_request pv request =
Lwt_pipe.safe_push_now pv.message_queue (Message (request, None))
let push_request_and_wait pv request =
let t, u = Lwt.wait () in
Lwt.catch
(fun () ->
Lwt_pipe.push_now_exn pv.message_queue (Message (request, Some u)) ;
t)
(function
| Lwt_pipe.Closed ->
let net_id = (State.Net.id (Distributed_db.net_state pv.net_db)) in
fail (Closed net_id)
| exn -> fail (Exn exn))
let close_queue pv =
let messages = Lwt_pipe.pop_all_now pv.message_queue in
List.iter
(function
| Message (_, Some u) ->
let net_id = (State.Net.id (Distributed_db.net_state pv.net_db)) in
Lwt.wakeup_later u (Error [ Closed net_id ])
| _ -> ())
messages ;
Lwt_pipe.close pv.message_queue
let already_handled pv oph = let already_handled pv oph =
Operation_hash.Map.mem oph pv.refusals Operation_hash.Map.mem oph pv.refusals
|| Operation_hash.Map.mem oph pv.pending || Operation_hash.Map.mem oph pv.pending
@ -153,7 +127,7 @@ let already_handled pv oph =
|| Operation_hash.Set.mem oph pv.in_mempool || Operation_hash.Set.mem oph pv.in_mempool
let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t =
{ Mempool.known_valid = fst (List.split r.applied) ; { Mempool.known_valid = List.map fst r.applied ;
pending = pending =
Operation_hash.Map.fold Operation_hash.Map.fold
(fun k _ s -> Operation_hash.Set.add k s) (fun k _ s -> Operation_hash.Set.add k s)
@ -184,7 +158,7 @@ let merge_validation_results ~old ~neu =
(filter_out neu.applied old.branch_delayed) (filter_out neu.applied old.branch_delayed)
neu.branch_delayed } neu.branch_delayed }
let advertise pv mempool = let advertise (w : t) pv mempool =
match pv.advertisement with match pv.advertisement with
| `Pending { Mempool.known_valid ; pending } -> | `Pending { Mempool.known_valid ; pending } ->
pv.advertisement <- pv.advertisement <-
@ -195,10 +169,10 @@ let advertise pv mempool =
pv.advertisement <- `Pending mempool ; pv.advertisement <- `Pending mempool ;
Lwt.async (fun () -> Lwt.async (fun () ->
Lwt_unix.sleep 0.01 >>= fun () -> Lwt_unix.sleep 0.01 >>= fun () ->
push_request pv Advertise ; Worker.push_request_now w Advertise ;
Lwt.return_unit) Lwt.return_unit)
let handle_unprocessed pv = let handle_unprocessed w pv =
begin match pv.validation_state with begin match pv.validation_state with
| Error err -> | Error err ->
pv.validation_result <- pv.validation_result <-
@ -211,49 +185,45 @@ let handle_unprocessed pv =
Operation_hash.Map.empty ; Operation_hash.Map.empty ;
Lwt.return () Lwt.return ()
| Ok validation_state -> | Ok validation_state ->
if Operation_hash.Map.is_empty pv.pending then match Operation_hash.Map.cardinal pv.pending with
Lwt.return () | 0 -> Lwt.return ()
else | n -> debug w "processing %d operations" n ;
begin match Operation_hash.Map.cardinal pv.pending with Prevalidation.prevalidate validation_state ~sort:true
| 0 -> Lwt.return () (Operation_hash.Map.bindings pv.pending)
| n -> lwt_debug "processing %d operations" n >>= fun (validation_state, validation_result) ->
end >>= fun () -> pv.validation_state <-
Prevalidation.prevalidate validation_state ~sort:true Ok validation_state ;
(Operation_hash.Map.bindings pv.pending) pv.in_mempool <-
>>= fun (validation_state, validation_result) -> (Operation_hash.Map.fold
pv.validation_state <- (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool)
Ok validation_state ; pv.pending @@
pv.in_mempool <- Operation_hash.Map.fold
(Operation_hash.Map.fold (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool)
(fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) pv.validation_result.refused @@
pv.pending @@ pv.in_mempool) ;
Operation_hash.Map.fold Operation_hash.Map.iter
(fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) (fun h (_, errs) ->
pv.validation_result.refused @@ Option.iter (Ring.add_and_return_erased pv.refused h)
pv.in_mempool) ; ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ;
Operation_hash.Map.iter pv.refusals <-
(fun h (_, errs) -> Operation_hash.Map.add h errs pv.refusals)
Option.iter (Ring.add_and_return_erased pv.refused h) pv.validation_result.refused ;
~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; Operation_hash.Map.iter
pv.refusals <- (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph)
Operation_hash.Map.add h errs pv.refusals) pv.validation_result.refused ;
pv.validation_result.refused ; pv.validation_result <-
Operation_hash.Map.iter merge_validation_results
(fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph) ~old:pv.validation_result
pv.validation_result.refused ; ~neu:validation_result ;
pv.validation_result <- pv.pending <-
merge_validation_results Operation_hash.Map.empty ;
~old:pv.validation_result advertise w pv
~neu:validation_result ; (mempool_of_prevalidation_result validation_result) ;
pv.pending <- Lwt.return ()
Operation_hash.Map.empty ;
advertise pv
(mempool_of_prevalidation_result validation_result) ;
Lwt.return ()
end >>= fun () -> end >>= fun () ->
pv.mempool <- pv.mempool <-
{ Mempool.known_valid = { Mempool.known_valid =
fst (List.split pv.validation_result.applied) ; List.rev_map fst pv.validation_result.applied ;
pending = pending =
Operation_hash.Map.fold Operation_hash.Map.fold
(fun k _ s -> Operation_hash.Set.add k s) (fun k _ s -> Operation_hash.Set.add k s)
@ -262,33 +232,29 @@ let handle_unprocessed pv =
(fun k _ s -> Operation_hash.Set.add k s) (fun k _ s -> Operation_hash.Set.add k s)
pv.validation_result.branch_refused @@ pv.validation_result.branch_refused @@
Operation_hash.Set.empty } ; Operation_hash.Set.empty } ;
Mempool.set (Distributed_db.net_state pv.net_db) State.Current_mempool.set (Distributed_db.net_state pv.net_db)
~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () ->
Lwt.return () Lwt.return ()
let fetch_operation pv ?peer oph = let fetch_operation w pv ?peer oph =
debug "fetching operation %a" Operation_hash.pp_short oph ; debug w
"fetching operation %a"
Operation_hash.pp_short oph ;
Distributed_db.Operation.fetch Distributed_db.Operation.fetch
~timeout:pv.limits.operation_timeout ~timeout:pv.limits.operation_timeout
pv.net_db ?peer oph () >>= function pv.net_db ?peer oph () >>= function
| Ok op -> | Ok op ->
push_request pv (Arrived (oph, op)) ; Worker.push_request_now w (Arrived (oph, op)) ;
Lwt.return_unit Lwt.return_unit
| Error [ Distributed_db.Operation.Canceled _ ] -> | Error [ Distributed_db.Operation.Canceled _ ] ->
lwt_debug debug w
"operation %a included before being prevalidated" "operation %a included before being prevalidated"
Operation_hash.pp_short oph >>= fun () -> Operation_hash.pp_short oph ;
Lwt.return_unit Lwt.return_unit
| Error _ -> (* should not happen *) | Error _ -> (* should not happen *)
Lwt.return_unit Lwt.return_unit
let clear_fetching pv = let on_operation_arrived (pv : state) oph op =
Operation_hash.Set.iter
(Distributed_db.Operation.clear_or_cancel pv.net_db)
pv.fetching
let on_operation_arrived pv oph op =
debug "operation %a retrieved" Operation_hash.pp_short oph ;
pv.fetching <- Operation_hash.Set.remove oph pv.fetching ; pv.fetching <- Operation_hash.Set.remove oph pv.fetching ;
if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin
Distributed_db.Operation.clear_or_cancel pv.net_db oph Distributed_db.Operation.clear_or_cancel pv.net_db oph
@ -299,43 +265,40 @@ let on_operation_arrived pv oph op =
let on_inject pv op = let on_inject pv op =
let oph = Operation.hash op in let oph = Operation.hash op in
log_notice "injection of operation %a" Operation_hash.pp_short oph ;
begin begin
begin if already_handled pv oph then if already_handled pv oph then
return pv.validation_result return pv.validation_result
else
Lwt.return pv.validation_state >>=? fun validation_state ->
Prevalidation.prevalidate
validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) ->
match result.applied with
| [ app_oph, _ ] when Operation_hash.equal app_oph oph ->
Distributed_db.inject_operation pv.net_db oph op >>= fun (_ : bool) ->
pv.pending <- Operation_hash.Map.add oph op pv.pending ;
return result
| _ ->
return result
end >>=? fun result ->
if List.mem_assoc oph result.applied then
return ()
else else
let try_in_map map proj or_else = Lwt.return pv.validation_state >>=? fun validation_state ->
try Prevalidation.prevalidate
Lwt.return (Error (proj (Operation_hash.Map.find oph map))) validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) ->
with Not_found -> or_else () in match result.applied with
try_in_map pv.refusals (fun h -> h) @@ fun () -> | [ app_oph, _ ] when Operation_hash.equal app_oph oph ->
try_in_map result.refused snd @@ fun () -> Distributed_db.inject_operation pv.net_db oph op >>= fun (_ : bool) ->
try_in_map result.branch_refused snd @@ fun () -> pv.pending <- Operation_hash.Map.add oph op pv.pending ;
try_in_map result.branch_delayed snd @@ fun () -> return result
if Operation_hash.Set.mem oph pv.live_operations then | _ ->
failwith "Injected operation %a included in a previous block." return result
Operation_hash.pp oph end >>=? fun result ->
else if List.mem_assoc oph result.applied then
failwith "Injected operation %a is not in prevalidation result." return ()
Operation_hash.pp oph else
end >>= fun tzresult -> let try_in_map map proj or_else =
return tzresult try
Lwt.return (Error (proj (Operation_hash.Map.find oph map)))
with Not_found -> or_else () in
try_in_map pv.refusals (fun h -> h) @@ fun () ->
try_in_map result.refused snd @@ fun () ->
try_in_map result.branch_refused snd @@ fun () ->
try_in_map result.branch_delayed snd @@ fun () ->
if Operation_hash.Set.mem oph pv.live_operations then
failwith "Injected operation %a included in a previous block."
Operation_hash.pp oph
else
failwith "Injected operation %a is not in prevalidation result."
Operation_hash.pp oph
let on_notify pv peer mempool = let on_notify w pv peer mempool =
let all_ophs = let all_ophs =
List.fold_left List.fold_left
(fun s oph -> Operation_hash.Set.add oph s) (fun s oph -> Operation_hash.Set.add oph s)
@ -344,16 +307,15 @@ let on_notify pv peer mempool =
Operation_hash.Set.filter Operation_hash.Set.filter
(fun oph -> not (already_handled pv oph)) (fun oph -> not (already_handled pv oph))
all_ophs in all_ophs in
debug "notification of %d new operations" (Operation_hash.Set.cardinal to_fetch) ;
pv.fetching <- pv.fetching <-
Operation_hash.Set.union Operation_hash.Set.union
to_fetch to_fetch
pv.fetching ; pv.fetching ;
Operation_hash.Set.iter Operation_hash.Set.iter
(fun oph -> Lwt.ignore_result (fetch_operation ~peer pv oph)) (fun oph -> Lwt.ignore_result (fetch_operation w pv ~peer oph))
to_fetch to_fetch
let on_flush pv predecessor = let on_flush w pv predecessor =
list_pendings list_pendings
~maintain_net_db:pv.net_db ~maintain_net_db:pv.net_db
~from_block:pv.predecessor ~to_block:predecessor ~from_block:pv.predecessor ~to_block:predecessor
@ -372,9 +334,8 @@ let on_flush pv predecessor =
validation_state ~sort:false [] >>= fun (state, result) -> validation_state ~sort:false [] >>= fun (state, result) ->
Lwt.return (Ok state, result) Lwt.return (Ok state, result)
end >>= fun (validation_state, validation_result) -> end >>= fun (validation_state, validation_result) ->
lwt_log_notice "flushing the mempool for new head %a (%d operations)" debug w "%d operations were not washed by the flush"
Block_hash.pp_short (State.Block.hash predecessor) (Operation_hash.Map.cardinal pending) ;
(Operation_hash.Map.cardinal pending) >>= fun () ->
pv.predecessor <- predecessor ; pv.predecessor <- predecessor ;
pv.live_blocks <- new_live_blocks ; pv.live_blocks <- new_live_blocks ;
pv.live_operations <- new_live_operations ; pv.live_operations <- new_live_operations ;
@ -393,48 +354,43 @@ let on_advertise pv =
pv.advertisement <- `None ; pv.advertisement <- `None ;
Distributed_db.Advertise.current_head pv.net_db ~mempool pv.predecessor Distributed_db.Advertise.current_head pv.net_db ~mempool pv.predecessor
let rec worker_loop pv = let on_request
begin : type r. t -> r Request.t -> r tzresult Lwt.t
handle_unprocessed pv >>= fun () -> = fun w request ->
Lwt_utils.protect ~canceler:pv.canceler begin fun () -> let pv = Worker.state w in
Lwt_pipe.pop pv.message_queue >>= return begin match request with
end >>=? fun (Message (message, u)) -> | Request.Flush hash ->
wakeup_with_result message u @@ function on_advertise pv ;
| Flush block -> (* TODO: rebase the advertisement instead *)
on_advertise pv ; let net_state = Distributed_db.net_state pv.net_db in
(* TODO: rebase the advertisement instead *) State.Block.read net_state hash >>=? fun block ->
on_flush pv block >>=? fun () -> on_flush w pv block >>=? fun () ->
return () return (() : r)
| Notify (peer, mempool) -> | Request.Notify (peer, mempool) ->
on_notify pv peer mempool ; on_notify w pv peer mempool ;
return () return ()
| Inject op -> | Request.Inject op ->
on_inject pv op on_inject pv op >>= fun tzresult ->
| Arrived (oph, op) -> return tzresult
on_operation_arrived pv oph op ; | Request.Arrived (oph, op) ->
return () on_operation_arrived pv oph op ;
| Advertise -> return ()
on_advertise pv ; | Request.Advertise ->
return () on_advertise pv ;
end >>= function return ()
| Ok () -> end >>=? fun r ->
worker_loop pv handle_unprocessed w pv >>= fun () ->
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> return r
clear_fetching pv ;
close_queue pv ;
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error:@ %a@]"
pp_print_error err >>= fun () ->
close_queue pv ;
clear_fetching pv ;
Lwt_canceler.cancel pv.canceler >>= fun () ->
Lwt.return_unit
let create limits net_db = let on_close w =
let pv = Worker.state w in
Operation_hash.Set.iter
(Distributed_db.Operation.clear_or_cancel pv.net_db)
pv.fetching ;
Lwt.return_unit
let on_launch w _ (limits, net_db) =
let net_state = Distributed_db.net_state net_db in let net_state = Distributed_db.net_state net_db in
let canceler = Lwt_canceler.create () in
let message_queue = Lwt_pipe.create () in
State.read_chain_store net_state State.read_chain_store net_state
(fun _ { current_head ; current_mempool ; live_blocks ; live_operations } -> (fun _ { current_head ; current_mempool ; live_blocks ; live_operations } ->
Lwt.return (current_head, current_mempool, live_blocks, live_operations)) Lwt.return (current_head, current_mempool, live_blocks, live_operations))
@ -455,8 +411,7 @@ let create limits net_db =
(fun s h -> Operation_hash.Set.add h s) (fun s h -> Operation_hash.Set.add h s)
Operation_hash.Set.empty mempool.known_valid in Operation_hash.Set.empty mempool.known_valid in
let pv = let pv =
{ limits ; net_db ; canceler ; { limits ; net_db ;
worker = Lwt.return_unit ; message_queue ;
predecessor ; timestamp ; live_blocks ; live_operations ; predecessor ; timestamp ; live_blocks ; live_operations ;
mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; mempool = { known_valid = [] ; pending = Operation_hash.Set.empty };
refused = Ring.create limits.max_refused_operations ; refused = Ring.create limits.max_refused_operations ;
@ -467,32 +422,52 @@ let create limits net_db =
validation_result ; validation_state ; validation_result ; validation_state ;
advertisement = `None } in advertisement = `None } in
List.iter List.iter
(fun oph -> Lwt.ignore_result (fetch_operation pv oph)) (fun oph -> Lwt.ignore_result (fetch_operation w pv oph))
mempool.known_valid ; mempool.known_valid ;
pv.worker <-
Lwt_utils.worker
(Format.asprintf "net_prevalidator.%a" Net_id.pp (State.Net.id net_state))
~run:(fun () -> worker_loop pv)
~cancel:(fun () -> Lwt_canceler.cancel pv.canceler) ;
Lwt.return pv Lwt.return pv
let shutdown pv = let on_error w r st errs =
lwt_debug "shutdown" >>= fun () -> Worker.record_event w (Event.Request (r, st, Some errs)) ;
Lwt_canceler.cancel pv.canceler >>= fun () -> Lwt.return (Error errs)
pv.worker
let flush pv head = let on_completion w r _ st =
push_request pv (Flush head) Worker.record_event w (Event.Request (Request.view r, st, None )) ;
Lwt.return ()
let notify_operations pv peer mempool = let table = Worker.create_table Queue
push_request pv (Notify (peer, mempool))
let operations pv = let create limits net_db =
let net_state = Distributed_db.net_state net_db in
let module Handlers = struct
type self = t
let on_launch = on_launch
let on_request = on_request
let on_close = on_close
let on_error = on_error
let on_completion = on_completion
let on_no_request _ = return ()
end in
Worker.launch table limits.worker_limits
(State.Net.id net_state)
(limits, net_db)
(module Handlers)
let shutdown = Worker.shutdown
let flush w head =
Worker.push_request_and_wait w (Flush head)
let notify_operations w peer mempool =
Worker.push_request_now w (Notify (peer, mempool))
let operations w =
let pv = Worker.state w in
{ pv.validation_result with { pv.validation_result with
applied = List.rev pv.validation_result.applied }, applied = List.rev pv.validation_result.applied },
pv.pending pv.pending
let pending ?block pv = let pending ?block w =
let pv = Worker.state w in
let ops = Preapply_result.operations pv.validation_result in let ops = Preapply_result.operations pv.validation_result in
match block with match block with
| Some to_block -> | Some to_block ->
@ -501,12 +476,25 @@ let pending ?block pv =
~from_block:pv.predecessor ~to_block ops ~from_block:pv.predecessor ~to_block ops
| None -> Lwt.return ops | None -> Lwt.return ops
let timestamp pv = pv.timestamp let timestamp w =
let pv = Worker.state w in
pv.timestamp
let context pv = let context w =
let pv = Worker.state w in
Lwt.return pv.validation_state >>=? fun validation_state -> Lwt.return pv.validation_state >>=? fun validation_state ->
Prevalidation.end_prevalidation validation_state Prevalidation.end_prevalidation validation_state
let inject_operation pv op = let inject_operation w op =
push_request_and_wait pv (Inject op) >>=? fun result -> Worker.push_request_and_wait w (Inject op) >>=? fun result ->
Lwt.return result Lwt.return result
let status = Worker.status
let running_workers () = Worker.list table
let pending_requests t = Worker.pending_requests t
let current_request t = Worker.current_request t
let last_events = Worker.last_events

View File

@ -32,26 +32,25 @@ type t
type limits = { type limits = {
max_refused_operations : int ; max_refused_operations : int ;
operation_timeout : float operation_timeout : float ;
worker_limits : Worker_types.limits ;
} }
type error += Closed of Net_id.t type error += Closed of Net_id.t
(** Creation and destruction of a "prevalidation" worker. *)
val create: limits -> Distributed_db.net_db -> t Lwt.t val create: limits -> Distributed_db.net_db -> t Lwt.t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit
(** Conditionnaly inject a new operation in the node: the operation will
be ignored when it is (strongly) refused This is the
entry-point used by the P2P layer. The operation content has been
previously stored on disk. *)
val inject_operation: t -> Operation.t -> unit tzresult Lwt.t val inject_operation: t -> Operation.t -> unit tzresult Lwt.t
val flush: t -> Block_hash.t -> unit tzresult Lwt.t
val flush: t -> State.Block.t -> unit
val timestamp: t -> Time.t val timestamp: t -> Time.t
val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t
val context: t -> Updater.validation_result tzresult Lwt.t val context: t -> Updater.validation_result tzresult Lwt.t
val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t
val running_workers: unit -> (Net_id.t * t) list
val status: t -> Worker_types.worker_status
val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list
val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option
val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list

View File

@ -101,37 +101,18 @@ and chain_state = {
and chain_data = { and chain_data = {
current_head: block ; current_head: block ;
current_mempool: mempool ; current_mempool: Mempool.t ;
live_blocks: Block_hash.Set.t ; live_blocks: Block_hash.Set.t ;
live_operations: Operation_hash.Set.t ; live_operations: Operation_hash.Set.t ;
locator: Block_locator.t Lwt.t lazy_t ; locator: Block_locator.t Lwt.t lazy_t ;
} }
and mempool = {
known_valid: Operation_hash.t list ;
pending: Operation_hash.Set.t ;
}
and block = { and block = {
net_state: net_state ; net_state: net_state ;
hash: Block_hash.t ; hash: Block_hash.t ;
contents: Store.Block.contents ; contents: Store.Block.contents ;
} }
let mempool_encoding =
let open Data_encoding in
conv
(fun { known_valid ; pending } -> (known_valid, pending))
(fun (known_valid, pending) -> { known_valid ; pending })
(obj2
(req "known_valid" (dynamic_size (list Operation_hash.encoding)))
(req "pending" (dynamic_size Operation_hash.Set.encoding)))
let empty_mempool = {
known_valid = [] ;
pending = Operation_hash.Set.empty ;
}
let read_chain_store { chain_state } f = let read_chain_store { chain_state } f =
Shared.use chain_state begin fun state -> Shared.use chain_state begin fun state ->
f state.chain_store state.data f state.chain_store state.data
@ -219,7 +200,7 @@ module Net = struct
hash = current_head ; hash = current_head ;
contents = current_block ; contents = current_block ;
} ; } ;
current_mempool = empty_mempool ; current_mempool = Mempool.empty ;
live_blocks = Block_hash.Set.singleton genesis.block ; live_blocks = Block_hash.Set.singleton genesis.block ;
live_operations = Operation_hash.Set.empty ; live_operations = Operation_hash.Set.empty ;
locator = lazy (compute_locator_from_hash net_state current_head) ; locator = lazy (compute_locator_from_hash net_state current_head) ;
@ -776,6 +757,24 @@ module Register_embedded_protocol
end end
module Current_mempool = struct
let set net_state ~head mempool =
update_chain_store net_state begin fun _chain_store data ->
if Block_hash.equal head (Block.hash data.current_head) then
Lwt.return (Some { data with current_mempool = mempool },
())
else
Lwt.return (None, ())
end
let get net_state =
read_chain_store net_state begin fun _chain_store data ->
Lwt.return (Block.header data.current_head, data.current_mempool)
end
end
let read let read
?patch_context ?patch_context
~store_root ~store_root

View File

@ -168,17 +168,9 @@ val compute_locator: Net.t -> ?size:int -> Block.t -> Block_locator.t Lwt.t
val fork_testnet: val fork_testnet:
Block.t -> Protocol_hash.t -> Time.t -> Net.t tzresult Lwt.t Block.t -> Protocol_hash.t -> Time.t -> Net.t tzresult Lwt.t
type mempool = {
known_valid: Operation_hash.t list ;
pending: Operation_hash.Set.t ;
}
val empty_mempool: mempool
val mempool_encoding: mempool Data_encoding.t
type chain_data = { type chain_data = {
current_head: Block.t ; current_head: Block.t ;
current_mempool: mempool ; current_mempool: Mempool.t ;
live_blocks: Block_hash.Set.t ; live_blocks: Block_hash.Set.t ;
live_operations: Operation_hash.Set.t ; live_operations: Operation_hash.Set.t ;
locator: Block_locator.t Lwt.t lazy_t ; locator: Block_locator.t Lwt.t lazy_t ;
@ -239,6 +231,17 @@ module Registred_protocol : sig
end end
module Current_mempool : sig
val get: Net.t -> (Block_header.t * Mempool.t) Lwt.t
(** The current mempool. *)
val set: Net.t -> head:Block_hash.t -> Mempool.t -> unit Lwt.t
(** Set the current mempool. It is ignored if the current head is
not the provided one. *)
end
module Register_embedded_protocol module Register_embedded_protocol
(Env : Updater.Node_protocol_environment_sigs.V1) (Env : Updater.Node_protocol_environment_sigs.V1)
(Proto : Env.Updater.PROTOCOL) (Proto : Env.Updater.PROTOCOL)

View File

@ -73,7 +73,7 @@ module Make
| Dropbox_buffer : (Time.t * message) Lwt_dropbox.t -> dropbox buffer | Dropbox_buffer : (Time.t * message) Lwt_dropbox.t -> dropbox buffer
and 'kind t = { and 'kind t = {
limits : Worker_state.limits ; limits : Worker_types.limits ;
timeout : float option ; timeout : float option ;
parameters : Types.parameters ; parameters : Types.parameters ;
mutable (* only for init *) worker : unit Lwt.t ; mutable (* only for init *) worker : unit Lwt.t ;
@ -83,7 +83,7 @@ module Make
canceler : Lwt_canceler.t ; canceler : Lwt_canceler.t ;
name : Name.t ; name : Name.t ;
id : int ; id : int ;
mutable status : Worker_state.worker_status ; mutable status : Worker_types.worker_status ;
mutable current_request : (Time.t * Time.t * Request.view) option ; mutable current_request : (Time.t * Time.t * Request.view) option ;
table : 'kind table ; table : 'kind table ;
} }
@ -236,9 +236,9 @@ module Make
val on_close : val on_close :
self -> unit Lwt.t self -> unit Lwt.t
val on_error : val on_error :
self -> Request.view -> Worker_state.request_status -> error list -> unit tzresult Lwt.t self -> Request.view -> Worker_types.request_status -> error list -> unit tzresult Lwt.t
val on_completion : val on_completion :
self -> 'a Request.t -> 'a -> Worker_state.request_status -> unit Lwt.t self -> 'a Request.t -> 'a -> Worker_types.request_status -> unit Lwt.t
end end
let create_table buffer_kind = let create_table buffer_kind =
@ -287,7 +287,7 @@ module Make
let completed = Time.now () in let completed = Time.now () in
w.current_request <- None ; w.current_request <- None ;
Handlers.on_completion w Handlers.on_completion w
request res Worker_state.{ pushed ; treated ; completed } >>= fun () -> request res Worker_types.{ pushed ; treated ; completed } >>= fun () ->
return () return ()
| Some u -> | Some u ->
Handlers.on_request w request >>= fun res -> Handlers.on_request w request >>= fun res ->
@ -296,7 +296,7 @@ module Make
let completed = Time.now () in let completed = Time.now () in
w.current_request <- None ; w.current_request <- None ;
Handlers.on_completion w Handlers.on_completion w
request res Worker_state.{ pushed ; treated ; completed } >>= fun () -> request res Worker_types.{ pushed ; treated ; completed } >>= fun () ->
return () return ()
end >>= function end >>= function
| Ok () -> | Ok () ->
@ -312,7 +312,7 @@ module Make
let completed = Time.now () in let completed = Time.now () in
w.current_request <- None ; w.current_request <- None ;
Handlers.on_error w Handlers.on_error w
request Worker_state.{ pushed ; treated ; completed } errs request Worker_types.{ pushed ; treated ; completed } errs
| None -> assert false | None -> assert false
end >>= function end >>= function
| Ok () -> | Ok () ->
@ -328,7 +328,7 @@ module Make
let launch let launch
: type kind. : type kind.
kind table -> ?timeout:float -> kind table -> ?timeout:float ->
Worker_state.limits -> Name.t -> Types.parameters -> Worker_types.limits -> Name.t -> Types.parameters ->
(module HANDLERS with type self = kind t) -> (module HANDLERS with type self = kind t) ->
kind t Lwt.t kind t Lwt.t
= fun table ?timeout limits name parameters (module Handlers) -> = fun table ?timeout limits name parameters (module Handlers) ->

View File

@ -40,7 +40,7 @@ module type EVENT = sig
(** Assigns a logging level to each event. (** Assigns a logging level to each event.
Events can be ignored for logging w.r.t. the global node configuration. Events can be ignored for logging w.r.t. the global node configuration.
Events can be ignored for introspection w.r.t. to the worker's Events can be ignored for introspection w.r.t. to the worker's
{!Worker_state.limits}. *) {!Worker_types.limits}. *)
val level : t -> Logging.level val level : t -> Logging.level
(** Serializer for the introspection RPCs *) (** Serializer for the introspection RPCs *)
@ -178,7 +178,7 @@ module Make
val on_error : val on_error :
self -> self ->
Request.view -> Request.view ->
Worker_state.request_status -> Worker_types.request_status ->
error list -> error list ->
unit tzresult Lwt.t unit tzresult Lwt.t
@ -187,7 +187,7 @@ module Make
val on_completion : val on_completion :
self -> self ->
'a Request.t -> 'a -> 'a Request.t -> 'a ->
Worker_state.request_status -> Worker_types.request_status ->
unit Lwt.t unit Lwt.t
end end
@ -195,7 +195,7 @@ module Make
Parameter [queue_size] not passed means unlimited queue. *) Parameter [queue_size] not passed means unlimited queue. *)
val launch : val launch :
'kind table -> ?timeout:float -> 'kind table -> ?timeout:float ->
Worker_state.limits -> Name.t -> Types.parameters -> Worker_types.limits -> Name.t -> Types.parameters ->
(module HANDLERS with type self = 'kind t) -> (module HANDLERS with type self = 'kind t) ->
'kind t Lwt.t 'kind t Lwt.t
@ -257,7 +257,7 @@ module Make
val pending_requests : _ queue t -> (Time.t * Request.view) list val pending_requests : _ queue t -> (Time.t * Request.view) list
(** Get the running status of a worker. *) (** Get the running status of a worker. *)
val status : _ t -> Worker_state.worker_status val status : _ t -> Worker_types.worker_status
(** Get the request being treated by a worker. (** Get the request being treated by a worker.
Gives the time the request was pushed, and the time its Gives the time the request was pushed, and the time its
@ -269,7 +269,7 @@ module Make
(** Lists the running workers in this group. (** Lists the running workers in this group.
After they are killed, workers are kept in the table After they are killed, workers are kept in the table
for a number of seconds given in the {!Worker_state.limits}. *) for a number of seconds given in the {!Worker_types.limits}. *)
val list : 'a table -> (Name.t * 'a t) list val list : 'a table -> (Name.t * 'a t) list
end end

View File

@ -0,0 +1,14 @@
(jbuild_version 1)
(library
((name tezos_node_shell_base)
(public_name tezos-node-shell-base)
(libraries (tezos-base
tezos-node-p2p-base))
(flags (:standard -open Tezos_base__TzPervasives
-open Tezos_node_p2p_base))))
(alias
((name runtest_indent)
(deps ((glob_files *.ml) (glob_files *.mli)))
(action (run bash ${libexec:tezos-stdlib:test-ocp-indent.sh} ${^}))))

View File

@ -7,28 +7,22 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
open State type t = {
type t = State.mempool = {
known_valid: Operation_hash.t list ; known_valid: Operation_hash.t list ;
pending: Operation_hash.Set.t ; pending: Operation_hash.Set.t ;
} }
type mempool = t type mempool = t
let encoding = State.mempool_encoding let encoding =
let empty = State.empty_mempool let open Data_encoding in
conv
let set net_state ~head mempool = (fun { known_valid ; pending } -> (known_valid, pending))
update_chain_store net_state begin fun _chain_store data -> (fun (known_valid, pending) -> { known_valid ; pending })
if Block_hash.equal head (Block.hash data.current_head) then (obj2
Lwt.return (Some { data with current_mempool = mempool }, (req "known_valid" (dynamic_size (list Operation_hash.encoding)))
()) (req "pending" (dynamic_size Operation_hash.Set.encoding)))
else
Lwt.return (None, ())
end
let get net_state =
read_chain_store net_state begin fun _chain_store data ->
Lwt.return (Block.header data.current_head, data.current_mempool)
end
let empty = {
known_valid = [] ;
pending = Operation_hash.Set.empty ;
}

View File

@ -10,7 +10,7 @@
(** Tezos Shell Module - Mempool, a.k.a. the operations safe to be (** Tezos Shell Module - Mempool, a.k.a. the operations safe to be
broadcasted. *) broadcasted. *)
type t = State.mempool = { type t = {
known_valid: Operation_hash.t list ; known_valid: Operation_hash.t list ;
(** A valid sequence of operations on top of the current head. *) (** A valid sequence of operations on top of the current head. *)
pending: Operation_hash.Set.t ; pending: Operation_hash.Set.t ;
@ -22,11 +22,3 @@ val encoding: mempool Data_encoding.t
val empty: mempool val empty: mempool
(** Empty mempool. *) (** Empty mempool. *)
val get: State.Net.t -> (Block_header.t * mempool) Lwt.t
(** The current mempool, *)
val set: State.Net.t -> head:Block_hash.t -> mempool -> unit Lwt.t
(** Set the current mempool. It is ignored if the current head is
not the provided one. *)

View File

@ -0,0 +1,182 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Request = struct
type 'a t =
| Flush : Block_hash.t -> unit t
| Notify : P2p_types.Peer_id.t * Mempool.t -> unit t
| Inject : Operation.t -> unit tzresult t
| Arrived : Operation_hash.t * Operation.t -> unit t
| Advertise : unit t
type view = View : _ t -> view
let view req = View req
let encoding =
let open Data_encoding in
union
[ case (Tag 0)
(obj2
(req "request" (constant "flush"))
(req "block" Block_hash.encoding))
(function View (Flush hash) -> Some ((), hash) | _ -> None)
(fun ((), hash) -> View (Flush hash)) ;
case (Tag 1)
(obj3
(req "request" (constant "notify"))
(req "peer" P2p_types.Peer_id.encoding)
(req "mempool" Mempool.encoding))
(function View (Notify (peer, mempool)) -> Some ((), peer, mempool) | _ -> None)
(fun ((), peer, mempool) -> View (Notify (peer, mempool))) ;
case (Tag 2)
(obj2
(req "request" (constant "inject"))
(req "operation" Operation.encoding))
(function View (Inject op) -> Some ((), op) | _ -> None)
(fun ((), op) -> View (Inject op)) ;
case (Tag 3)
(obj3
(req "request" (constant "arrived"))
(req "operation_hash" Operation_hash.encoding)
(req "operation" Operation.encoding))
(function View (Arrived (oph, op)) -> Some ((), oph, op) | _ -> None)
(fun ((), oph, op) -> View (Arrived (oph, op))) ;
case (Tag 4)
(obj1 (req "request" (constant "advertise")))
(function View Advertise -> Some () | _ -> None)
(fun () -> View Advertise) ]
let pp ppf (View r) = match r with
| Flush hash ->
Format.fprintf ppf "switching to new head %a"
Block_hash.pp hash
| Notify (id, { Mempool.known_valid ; pending }) ->
Format.fprintf ppf "@[<v 2>notified by %a of operations"
P2p_types.Peer_id.pp id ;
List.iter
(fun oph ->
Format.fprintf ppf "@,%a (applied)"
Operation_hash.pp oph)
known_valid ;
List.iter
(fun oph ->
Format.fprintf ppf "@,%a (pending)"
Operation_hash.pp oph)
(Operation_hash.Set.elements pending) ;
Format.fprintf ppf "@]"
| Inject op ->
Format.fprintf ppf "injecting operation %a"
Operation_hash.pp (Operation.hash op)
| Arrived (oph, _) ->
Format.fprintf ppf "operation %a arrived"
Operation_hash.pp oph
| Advertise ->
Format.fprintf ppf "advertising pending operations"
end
module Event = struct
type t =
| Request of (Request.view * Worker_types.request_status * error list option)
| Debug of string
let level req =
let open Request in
match req with
| Debug _ -> Logging.Debug
| Request (View (Flush _), _, _) -> Logging.Notice
| Request (View (Notify _), _, _) -> Logging.Debug
| Request (View (Inject _), _, _) -> Logging.Notice
| Request (View (Arrived _), _, _) -> Logging.Debug
| Request (View Advertise, _, _) -> Logging.Debug
let encoding error_encoding =
let open Data_encoding in
union
[ case (Tag 0)
(obj1 (req "message" string))
(function Debug msg -> Some msg | _ -> None)
(fun msg -> Debug msg) ;
case (Tag 1)
(obj2
(req "request" Request.encoding)
(req "status" Worker_types.request_status_encoding))
(function Request (req, t, None) -> Some (req, t) | _ -> None)
(fun (req, t) -> Request (req, t, None)) ;
case (Tag 2)
(obj3
(req "error" error_encoding)
(req "failed_request" Request.encoding)
(req "status" Worker_types.request_status_encoding))
(function Request (req, t, Some errs) -> Some (errs, req, t) | _ -> None)
(fun (errs, req, t) -> Request (req, t, Some errs)) ]
let pp ppf = function
| Debug msg -> Format.fprintf ppf "%s" msg
| Request (view, { pushed ; treated ; completed }, None) ->
Format.fprintf ppf
"@[<v 2>%a@,\
Pushed: %a, Treated: %a, Completed: %a@]"
Request.pp view
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
| Request (view, { pushed ; treated ; completed }, Some errors) ->
Format.fprintf ppf
"@[<v 2>%a@,\
Pushed: %a, Treated: %a, Failed: %a@,\
Error: %a@]"
Request.pp view
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
Error_monad.pp_print_error errors
end
module Worker_state = struct
type view =
{ head : Block_hash.t ;
timestamp : Time.t ;
fetching : Operation_hash.Set.t ;
pending : Operation_hash.Set.t ;
applied : Operation_hash.t list ;
delayed : Operation_hash.Set.t }
let encoding =
let open Data_encoding in
conv
(fun { head ; timestamp ; fetching ; pending ; applied ; delayed } ->
(head, timestamp, fetching, pending, applied, delayed))
(fun (head, timestamp, fetching, pending, applied, delayed) ->
{ head ; timestamp ; fetching ; pending ; applied ; delayed })
(obj6
(req "head" Block_hash.encoding)
(req "timestamp" Time.encoding)
(req "fetching" Operation_hash.Set.encoding)
(req "pending" Operation_hash.Set.encoding)
(req "applied" (list Operation_hash.encoding))
(req "delayed" Operation_hash.Set.encoding))
let pp ppf view =
Format.fprintf ppf
"@[<v 0>\
Head: %a@,\
Timestamp: %a@,
@[<v 2>Fetching: %a@]@,
@[<v 2>Pending: %a@]@,
@[<v 2>Applied: %a@]@,
@[<v 2>Delayed: %a@]@]"
Block_hash.pp
view.head
Time.pp_hum
view.timestamp
(Format.pp_print_list Operation_hash.pp)
(Operation_hash.Set.elements view.fetching)
(Format.pp_print_list Operation_hash.pp)
(Operation_hash.Set.elements view.pending)
(Format.pp_print_list Operation_hash.pp)
view.applied
(Format.pp_print_list Operation_hash.pp)
(Operation_hash.Set.elements view.delayed)
end

View File

@ -0,0 +1,42 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Request : sig
type 'a t =
| Flush : Block_hash.t -> unit t
| Notify : P2p_types.Peer_id.t * Mempool.t -> unit t
| Inject : Operation.t -> unit tzresult t
| Arrived : Operation_hash.t * Operation.t -> unit t
| Advertise : unit t
type view = View : _ t -> view
val view : 'a t -> view
val encoding : view Data_encoding.t
val pp : Format.formatter -> view -> unit
end
module Event : sig
type t =
| Request of (Request.view * Worker_types.request_status * error list option)
| Debug of string
val level : t -> Logging.level
val encoding : error list Data_encoding.t -> t Data_encoding.t
val pp : Format.formatter -> t -> unit
end
module Worker_state : sig
type view =
{ head : Block_hash.t ;
timestamp : Time.t ;
fetching : Operation_hash.Set.t ;
pending : Operation_hash.Set.t ;
applied : Operation_hash.t list ;
delayed : Operation_hash.Set.t }
val encoding : view Data_encoding.t
val pp : Format.formatter -> view -> unit
end

View File

@ -0,0 +1,24 @@
opam-version: "1.2"
version: "dev"
maintainer: "contact@tezos.com"
authors: [ "Tezos devteam" ]
homepage: "https://www.tezos.com/"
bug-reports: "https://gitlab.com/tezos/tezos/issues"
dev-repo: "https://gitlab.com/tezos/tezos.git"
license: "unreleased"
depends: [
"ocamlfind" { build }
"jbuilder" { build & >= "1.0+beta15" }
"base-bigarray"
"mtime.clock.os"
"ocplib-resto-cohttp"
"tezos-base"
"tezos-worker"
"tezos-node-p2p-base"
]
build: [
[ "jbuilder" "build" "-p" name "-j" jobs ]
]
build-test: [
[ "jbuilder" "runtest" "-p" name "-j" jobs ]
]