Module Async_kernel.Throttle
A way to limit the number of concurrent computations.
A throttle is essentially a pipe to which one can feed jobs.
A throttle schedules asynchronous jobs so that at any point in time no more than max_concurrent_jobs
jobs are running. A job f
is considered to be running from the time f ()
is executed until the deferred returned by f ()
becomes determined, or f ()
raises. The throttle initiates jobs on a first-come first-served basis.
One can use create_with
to create a throttle with "resources" that one would like to make available to concurrent jobs and to guarantee that different jobs access different resources.
A throttle is killed if one of its jobs throws an exception, and the throttle has continue_on_error = false
. A throttle can also be explicitly kill
ed. If a throttle is killed, then all jobs in it that haven't yet started are aborted, i.e., they will not start and will become determined with `Aborted
. Jobs that had already started will continue, and return `Ok
or `Raised
as usual when they finish. Jobs enqueued into a killed throttle will be immediately aborted.
module Deferred = Async_kernel__.Deferred1
module T2 : sig ... end
We use a phantom type to distinguish between throttles, which have
max_concurrent_jobs >= 1
, and sequencers, which havemax_concurrent_jobs = 1
. All operations are available on both. We make the distinction because it is sometimes useful to know from the type of a throttle that it is a sequencer and that at most one job can be running at a time.
type 'a t
= ('a, [ `throttle ]) T2.t
val sexp_of_t : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a t -> Ppx_sexp_conv_lib.Sexp.t
include Core_kernel.Invariant.S1 with type 'a t := 'a t
val invariant : 'a Base__.Invariant_intf.inv -> 'a t Base__.Invariant_intf.inv
val create : continue_on_error:bool -> max_concurrent_jobs:int -> unit t
create ~continue_on_error ~max_concurrent_jobs
returns a throttle that will run up tomax_concurrent_jobs
concurrently.If some job raises an exception, then the throttle will be killed, unless
continue_on_error
is true.
val create_with : continue_on_error:bool -> 'a list -> 'a t
create_with ~continue_on_error job_resources
returns a throttle that will run up toList.length job_resources
concurrently, and will ensure that all running jobs are supplied distinct elements ofjob_resources
.
val sexp_of_outcome : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a outcome -> Ppx_sexp_conv_lib.Sexp.t
val enqueue' : ('a, _) T2.t -> ('a -> 'b Deferred.t) -> 'b outcome Deferred.t
enqueue t job
schedulesjob
to be run as soon as possible. Jobs are guaranteed to be started in the order they areenqueue
d and to not be started during the call toenqueue
. Ift
is dead, thenjob
will be immediately aborted (forenqueue
, this will send an exception to the monitor in effect).
val enqueue : ('a, _) T2.t -> ('a -> 'b Deferred.t) -> 'b Deferred.t
val monad_sequence_how : ?how:Monad_sequence.how -> f:('a -> 'b Deferred.t) -> ('a -> 'b Deferred.t) Core_kernel.Staged.t
monad_sequence_how ~how ~f
returns a function that behaves likef
, except that it uses a throttle to limit the number of concurrent invocations that can be running simultaneously. The throttle hascontinue_on_error = false
.
val monad_sequence_how2 : ?how:Monad_sequence.how -> f:('a1 -> 'a2 -> 'b Deferred.t) -> ('a1 -> 'a2 -> 'b Deferred.t) Core_kernel.Staged.t
val prior_jobs_done : (_, _) T2.t -> unit Deferred.t
prior_jobs_done t
becomes determined when all of the jobs that were previously enqueued int
have completed.
val max_concurrent_jobs : (_, _) T2.t -> int
max_concurrent_jobs t
returns the maximum number of jobs thatt
will run concurrently.
val num_jobs_running : (_, _) T2.t -> int
num_jobs_running t
returns the number of jobs thatt
is currently running. It is guaranteed that ifnum_jobs_running t < max_concurrent_jobs t
thennum_jobs_waiting_to_start t = 0
. That is, the throttle always uses its maximum concurrency if possible.
val num_jobs_waiting_to_start : (_, _) T2.t -> int
num_jobs_waiting_to_start t
returns the number of jobs that have beenenqueue
d but have not yet started.
val capacity_available : (_, _) T2.t -> unit Deferred.t
capacity_available t
becomes determined the next time thatt
has fewer thanmax_concurrent_jobs t
running, and hence anenqueue
d job would start immediately.
val kill : (_, _) T2.t -> unit
kill t
killst
, which aborts all enqueued jobs that haven't started and all jobs enqueued in the future.kill
also initiates theat_kill
clean functions.If
t
has already been killed, then callingkill t
has no effect.
val is_dead : (_, _) T2.t -> bool
is_dead t
returnstrue
ift
was killed, either bykill
or by an unhandled exception in a job.
val at_kill : ('a, _) T2.t -> ('a -> unit Deferred.t) -> unit
at_kill t clean
runsclean
on each resource whent
is killed, either bykill
or by an unhandled exception.clean
is called on each resource as it becomes available, i.e., immediately if the resource isn't currently in use, otherwise when the job currently using the resource finishes. If a call toclean
fails, the exception is raised to the monitor in effect whenat_kill
was called.
val cleaned : (_, _) T2.t -> unit Deferred.t
cleaned t
becomes determined aftert
is killed, all its running jobs have completed, and allat_kill
clean functions have completed.
module Sequencer : sig ... end
A sequencer is a throttle that is specialized to only allow one job at a time and to, by default, not continue on error.