Lwt: do not use deprecated functions

This commit is contained in:
Vincent Bernardoff 2017-12-29 15:29:29 +01:00 committed by Benjamin Canou
parent 0ce7c79367
commit 58ad60f38a
4 changed files with 30 additions and 27 deletions

View File

@ -120,7 +120,8 @@ end = struct
} }
and status = and status =
| Pending of { wakener : value tzresult Lwt.u ; | Pending of { waiter : value tzresult Lwt.t ;
wakener : value tzresult Lwt.u ;
mutable waiters : int ; mutable waiters : int ;
param : param } param : param }
| Found of value | Found of value
@ -216,20 +217,20 @@ end = struct
| exception Not_found -> begin | exception Not_found -> begin
let waiter, wakener = Lwt.wait () in let waiter, wakener = Lwt.wait () in
Memory_table.add s.memory k Memory_table.add s.memory k
(Pending { wakener ; waiters = 1 ; param }) ; (Pending { waiter ; wakener ; waiters = 1 ; param }) ;
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
wrap s k ?timeout waiter wrap s k ?timeout waiter
end end
| Pending data -> | Pending data ->
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
data.waiters <- data.waiters + 1 ; data.waiters <- data.waiters + 1 ;
wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener) wrap s k ?timeout data.waiter
| Found v -> return v | Found v -> return v
end end
| Pending data -> | Pending data ->
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
data.waiters <- data.waiters + 1 ; data.waiters <- data.waiters + 1 ;
wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener) wrap s k ?timeout data.waiter
| Found v -> return v | Found v -> return v
let prefetch s ?peer ?timeout k param = let prefetch s ?peer ?timeout k param =

View File

@ -19,6 +19,7 @@ type t = {
timeout: timeout ; timeout: timeout ;
bootstrap_threshold: int ; bootstrap_threshold: int ;
mutable bootstrapped: bool ; mutable bootstrapped: bool ;
bootstrapped_waiter: unit Lwt.t ;
bootstrapped_wakener: unit Lwt.u ; bootstrapped_wakener: unit Lwt.u ;
valid_block_input: State.Block.t Lwt_watcher.input ; valid_block_input: State.Block.t Lwt_watcher.input ;
global_valid_block_input: State.Block.t Lwt_watcher.input ; global_valid_block_input: State.Block.t Lwt_watcher.input ;
@ -130,7 +131,7 @@ let rec create
let valid_block_input = Lwt_watcher.create_input () in let valid_block_input = Lwt_watcher.create_input () in
let new_head_input = Lwt_watcher.create_input () in let new_head_input = Lwt_watcher.create_input () in
let canceler = Lwt_canceler.create () in let canceler = Lwt_canceler.create () in
let _, bootstrapped_wakener = Lwt.wait () in let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in
let nv = { let nv = {
db ; net_state ; net_db ; block_validator ; db ; net_state ; net_db ; block_validator ;
prevalidator ; prevalidator ;
@ -139,6 +140,7 @@ let rec create
new_head_input ; new_head_input ;
parent ; max_child_ttl ; child = None ; parent ; max_child_ttl ; child = None ;
bootstrapped = (bootstrap_threshold <= 0) ; bootstrapped = (bootstrap_threshold <= 0) ;
bootstrapped_waiter ;
bootstrapped_wakener ; bootstrapped_wakener ;
bootstrap_threshold ; bootstrap_threshold ;
active_peers = active_peers =
@ -336,8 +338,8 @@ let validate_block nv ?(force = false) hash block operations =
else else
failwith "Fitness too low" failwith "Fitness too low"
let bootstrapped { bootstrapped_wakener } = let bootstrapped { bootstrapped_waiter } =
Lwt.protected (Lwt.waiter_of_wakener bootstrapped_wakener) Lwt.protected bootstrapped_waiter
let valid_block_watcher { valid_block_input } = let valid_block_watcher { valid_block_input } =
Lwt_watcher.create_stream valid_block_input Lwt_watcher.create_stream valid_block_input

View File

@ -14,7 +14,7 @@ exception Closed
type 'a t = type 'a t =
{ mutable data : 'a option ; { mutable data : 'a option ;
mutable closed : bool ; mutable closed : bool ;
mutable put_waiter : unit Lwt.u option ; mutable put_waiter : (unit Lwt.t * unit Lwt.u) option ;
} }
let create () = let create () =
@ -26,9 +26,9 @@ let create () =
let notify_put dropbox = let notify_put dropbox =
match dropbox.put_waiter with match dropbox.put_waiter with
| None -> () | None -> ()
| Some w -> | Some (_waiter, wakener) ->
dropbox.put_waiter <- None ; dropbox.put_waiter <- None ;
Lwt.wakeup_later w () Lwt.wakeup_later wakener ()
let put dropbox elt = let put dropbox elt =
if dropbox.closed then if dropbox.closed then
@ -48,14 +48,14 @@ let close dropbox =
let wait_put ~timeout dropbox = let wait_put ~timeout dropbox =
match dropbox.put_waiter with match dropbox.put_waiter with
| Some w -> | Some (waiter, _wakener) ->
Lwt.choose [ Lwt.choose [
timeout ; timeout ;
Lwt.protected (Lwt.waiter_of_wakener w) Lwt.protected waiter
] ]
| None -> | None ->
let waiter, wakener = Lwt.wait () in let waiter, wakener = Lwt.wait () in
dropbox.put_waiter <- Some wakener ; dropbox.put_waiter <- Some (waiter, wakener) ;
Lwt.choose [ Lwt.choose [
timeout ; timeout ;
Lwt.protected waiter ; Lwt.protected waiter ;

View File

@ -65,7 +65,7 @@ end
type trigger = type trigger =
| Absent | Absent
| Present | Present
| Waiting of unit Lwt.u | Waiting of unit Lwt.t * unit Lwt.u
let trigger () : (unit -> unit) * (unit -> unit Lwt.t) = let trigger () : (unit -> unit) * (unit -> unit Lwt.t) =
let state = ref Absent in let state = ref Absent in
@ -73,28 +73,28 @@ let trigger () : (unit -> unit) * (unit -> unit Lwt.t) =
match !state with match !state with
| Absent -> state := Present | Absent -> state := Present
| Present -> () | Present -> ()
| Waiting u -> | Waiting (_waiter, wakener) ->
state := Absent; state := Absent;
Lwt.wakeup u () Lwt.wakeup wakener ()
in in
let wait () = let wait () =
match !state with match !state with
| Absent -> | Absent ->
let waiter, u = Lwt.wait () in let waiter, wakener = Lwt.wait () in
state := Waiting u; state := Waiting (waiter, wakener) ;
waiter waiter
| Present -> | Present ->
state := Absent; state := Absent;
Lwt.return_unit Lwt.return_unit
| Waiting u -> | Waiting (waiter, _wakener) ->
Lwt.waiter_of_wakener u waiter
in in
trigger, wait trigger, wait
type 'a queue = type 'a queue =
| Absent | Absent
| Present of 'a list ref | Present of 'a list ref
| Waiting of 'a list Lwt.u | Waiting of ('a list Lwt.t * 'a list Lwt.u)
let queue () : ('a -> unit) * (unit -> 'a list Lwt.t) = let queue () : ('a -> unit) * (unit -> 'a list Lwt.t) =
let state = ref Absent in let state = ref Absent in
@ -102,21 +102,21 @@ let queue () : ('a -> unit) * (unit -> 'a list Lwt.t) =
match !state with match !state with
| Absent -> state := Present (ref [v]) | Absent -> state := Present (ref [v])
| Present r -> r := v :: !r | Present r -> r := v :: !r
| Waiting u -> | Waiting (_waiter, wakener) ->
state := Absent; state := Absent;
Lwt.wakeup u [v] Lwt.wakeup wakener [v]
in in
let wait () = let wait () =
match !state with match !state with
| Absent -> | Absent ->
let waiter, u = Lwt.wait () in let waiter, wakener = Lwt.wait () in
state := Waiting u; state := Waiting (waiter, wakener) ;
waiter waiter
| Present r -> | Present r ->
state := Absent; state := Absent;
Lwt.return (List.rev !r) Lwt.return (List.rev !r)
| Waiting u -> | Waiting (waiter, _wakener) ->
Lwt.waiter_of_wakener u waiter
in in
queue, wait queue, wait