Module Async_unix__.Scheduler
type t
= Async_unix__.Raw_scheduler.t
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
include module type of sig ... end
type 'a with_options
= ?monitor:Async_kernel.Monitor.t -> ?priority:Async_kernel.Priority.t -> 'a
val current_execution_context : unit -> Async_kernel.Execution_context.t
val within_context : Async_kernel.Execution_context.t -> (unit -> 'a) -> ('a, unit) Core_kernel.Result.t
val within' : ((unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t) with_options
val within : ((unit -> unit) -> unit) with_options
val within_v : ((unit -> 'a) -> 'a option) with_options
val with_local : 'a Core_kernel.Univ_map.Key.t -> 'a option -> f:(unit -> 'b) -> 'b
val find_local : 'a Core_kernel.Univ_map.Key.t -> 'a option
val schedule' : ((unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t) with_options
val schedule : ((unit -> unit) -> unit) with_options
val enqueue_job : Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit
val thread_safe_enqueue_job : Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit
val preserve_execution_context : ('a -> unit) -> ('a -> unit) Core_kernel.Staged.t
val preserve_execution_context' : ('a -> 'b Async_kernel.Deferred.t) -> ('a -> 'b Async_kernel.Deferred.t) Core_kernel.Staged.t
val cycle_start : unit -> Core_kernel.Time.t
val cycle_start_ns : unit -> Async_kernel__.Import.Time_ns.t
val cycle_times : unit -> Core_kernel.Time.Span.t Async_kernel__.Async_stream.t
val cycle_times_ns : unit -> Async_kernel__.Import.Time_ns.Span.t Async_kernel__.Async_stream.t
val long_cycles : at_least:Async_kernel__.Import.Time_ns.Span.t -> Async_kernel__.Import.Time_ns.Span.t Async_kernel__.Async_stream.t
val cycle_count : unit -> int
val total_cycle_time : unit -> Async_kernel__.Import.Time_ns.Span.t
val event_precision : unit -> Core_kernel.Time.Span.t
val event_precision_ns : unit -> Async_kernel__.Import.Time_ns.Span.t
val force_current_cycle_to_end : unit -> unit
val set_max_num_jobs_per_priority_per_cycle : int -> unit
val max_num_jobs_per_priority_per_cycle : unit -> int
val set_record_backtraces : bool -> unit
val yield : unit -> unit Async_kernel.Deferred.t
val yield_until_no_jobs_remain : unit -> unit Async_kernel.Deferred.t
val yield_every : n:int -> (unit -> unit Async_kernel.Deferred.t) Core_kernel.Staged.t
val num_jobs_run : unit -> int
val num_pending_jobs : unit -> int
module Private = Async_kernel__.Scheduler
val t : unit -> t
t ()
returns the Async scheduler. If the scheduler hasn't been created yet, this will create it and acquire the Async lock.
val max_num_open_file_descrs : unit -> int
val max_num_threads : unit -> int
val go : ?raise_unhandled_exn:bool -> unit -> Core.never_returns
go ?raise_unhandled_exn ()
passes control to Async, at which point Async starts running handlers, one by one without interruption, until there are no more handlers to run. When Async is out of handlers, it blocks until the outside world schedules more of them. Because of this, Async programs do not exit untilshutdown
is called.go ()
callshandle_signal Sys.sigpipe
, which causes the SIGPIPE signal to be ignored. Low-level syscalls (e.g., write) still raise EPIPE.If any Async job raises an unhandled exception that is not handled by any monitor, Async execution ceases. Then, by default, Async pretty prints the exception, and exits with status 1. If you don't want this, pass
~raise_unhandled_exn:true
, which will cause the unhandled exception to be raised to the caller ofgo ()
.
val go_main : ?raise_unhandled_exn:bool -> ?file_descr_watcher:Async_unix__.Config.File_descr_watcher.t -> ?max_num_open_file_descrs:int -> ?max_num_threads:int -> main:(unit -> unit) -> unit -> Core.never_returns
go_main
is likego
, except that you supply amain
function that will be run to initialize the Async computation, and thatgo_main
will fail if any Async has been used prior togo_main
being called. Moreover it allows you to configure more static options of the scheduler.
val report_long_cycle_times : ?cutoff:Core.Time.Span.t -> unit -> unit
report_long_cycle_times ?cutoff ()
sets up something that will print a warning to stderr whenever there is an Async cycle that is too long, as specified bycutoff
, whose default is 1s.
val set_max_inter_cycle_timeout : Core.Time.Span.t -> unit
set_max_inter_cycle_timeout span
sets the maximum amount of time the scheduler will remain blocked (on epoll or select) between cycles.
val set_check_invariants : bool -> unit
set_check_invariants do_check
sets whether Async should check invariants of its internal data structures.set_check_invariants true
can substantially slow down your program.
val set_detect_invalid_access_from_thread : bool -> unit
set_detect_invalid_access_from_thread do_check
sets whether Async routines should check if they are being accessed from some thread other than the thread currently holding the Async lock, which is not allowed and can lead to very confusing behavior.
type 'b folder
=
{
folder : a. 'b -> t -> (t, 'a) Core.Field.t -> 'b;
}
val fold_fields : init:'b -> 'b folder -> 'b
fold_fields ~init folder
foldsfolder
over each field in the scheduler. The fields themselves are not exposed --folder
must be a polymorphic function that can work on any field. So, it's only useful for generic operations, e.g., getting the size of each field.
val is_ready_to_initialize : unit -> bool
val reset_in_forked_process : unit -> unit
If a process that has already created, but not started, the Async scheduler would like to fork, and would like the child to have a clean Async, i.e., not inherit any of the Async work that was done in the parent, it can call
reset_in_forked_process
at the start of execution in the child process. After that, the child can do Async stuff and then start the Async scheduler.
val make_async_unusable : unit -> unit
make_async_unusable ()
makes subsequent attempts to use the Async scheduler raise. One use case formake_async_unusable
is if you fork from a process already running the Async scheduler, and want to run non-Async OCaml code in the child process, with the guarantee that the child process does not use Async.
val add_busy_poller : (unit -> [ `Continue_polling | `Stop_polling of 'a ]) -> 'a Async_unix__.Import.Deferred.t
Async supports "busy polling", which runs a thread that busy loops running user-supplied polling functions. The busy-loop thread is distinct from Async's scheduler thread.
Busy polling is useful for a situation like a shared-memory ringbuffer being used for IPC. One can poll the ringbuffer with a busy poller, and then when data is detected, fill some ivar that causes Async code to handle the data.
add_busy_poller poll
addspoll
to the busy loop.poll
will be called continuously, once per iteration of the busy loop, until it returns`Stop_polling a
at which point the result ofadd_busy_poller
will become determined.poll
will hold the Async lock while running, so it is fine to do ordinary Async operations, e.g., fill an ivar. The busy loop will run an ordinary Async cycle if any of the pollers add jobs.poll
will run in the monitor in effect whenadd_busy_poller
was called; exceptions raised bypoll
will be sent asynchronously to that monitor. Ifpoll
raises, it will still be run on subsequent iterations of the busy loop.
val handle_thread_pool_stuck : (stuck_for:Core.Time_ns.Span.t -> unit) -> unit
handle_thread_pool_stuck f
causesf
to run whenever Async detects its thread pool is stuck (i.e., hasn't completed a job for over a second and has work waiting to start). Async checks every second. By default, if the thread pool has been stuck for less than 60s, Async willeprintf
a message. If more than 60s, Async will send an exception to the main monitor, which will abort the program unless there is a custom handler for the main monitor.Calling
handle_thread_pool_stuck
replaces whatever behavior was previously there.
val default_handle_thread_pool_stuck : Thread_pool.t -> stuck_for:Core.Time_ns.Span.t -> unit
val time_spent_waiting_for_io : unit -> Core.Time_ns.Span.t
time_spent_waiting_for_io ()
returns the amount of time that the Async scheduler has spent in calls toepoll_wait
(orselect
) since the start of the program.
val set_min_inter_cycle_timeout : Core.Time_ns.Span.t -> unit
set_min_inter_cycle_timeout
sets the minimum timeout that the scheduler will pass to the OS when it checks for I/O between cycles. The minimum is zero by default. Setting it to a nonzero value is used to increase thread fairness between the scheduler and other threads. A plausible setting is 10us. This can also be set via theASYNC_CONFIG
environment variable.