Module Async_unix__.Raw_scheduler
module Fd = Async_unix__.Raw_fd
module Watching = Fd.Watching
module Signal = Core.Signal
module Timerfd = Core.Linux_ext.Timerfd
module Tsc = Time_stamp_counter
module File_descr_watcher : sig ... end
include Async_unix__.Import.Async_kernel_scheduler
type 'a with_options
= ?monitor:Async_kernel.Monitor.t -> ?priority:Async_kernel.Priority.t -> 'a
val current_execution_context : unit -> Async_kernel.Execution_context.t
val within_context : Async_kernel.Execution_context.t -> (unit -> 'a) -> ('a, unit) Core_kernel.Result.t
within_context context f
runsf ()
right now with the specified execution context. Iff
raises, then the exception is sent to the monitor ofcontext
, andError ()
is returned.
val within' : ((unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t) with_options
within' f ~monitor ~priority
runsf ()
right now, with the specified block group, monitor, and priority set as specified. They will be reset to their original values whenf
returns. Iff
raises, then the result ofwithin'
will never become determined, but the exception will end up in the specified monitor.
val within : ((unit -> unit) -> unit) with_options
within
is likewithin'
, but doesn't require the thunk to return a deferred.
val within_v : ((unit -> 'a) -> 'a option) with_options
within_v
is likewithin
, but allows a value to be returned byf
.
val with_local : 'a Core_kernel.Univ_map.Key.t -> 'a option -> f:(unit -> 'b) -> 'b
with_local key value ~f
, when run in the current execution context,e
, runsf
right now in a new execution context,e'
, that is identical toe
except thatfind_local key = value
. As usual,e'
will be in effect in asynchronous computations started byf
. Whenwith_local
returns, the execution context is restored toe
.
val find_local : 'a Core_kernel.Univ_map.Key.t -> 'a option
find_local key
returns the value associated tokey
in the current execution context.
val schedule' : ((unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t) with_options
Just like
within'
, but instead of running the thunk right now, adds it to the Async queue to be run with other Async jobs.
val schedule : ((unit -> unit) -> unit) with_options
Just like
schedule'
, but doesn't require the thunk to return a deferred.
val enqueue_job : Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit
eneque_job execution_context.t f a
enqueues into the scheduler's job queue a job that will runf a
inexecution_context
.
val thread_safe_enqueue_job : Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit
thread_safe_enqueue_job
is likeenqueue_job
, except it is for use from a thread that doesn't hold the Async lock.
val preserve_execution_context : ('a -> unit) -> ('a -> unit) Core_kernel.Staged.t
preserve_execution_context t f
saves the current execution context and returns a functiong
such thatg a
runsf a
in the saved execution context.g a
becomes determined whenf a
becomes determined.
val preserve_execution_context' : ('a -> 'b Async_kernel.Deferred.t) -> ('a -> 'b Async_kernel.Deferred.t) Core_kernel.Staged.t
val cycle_start : unit -> Core_kernel.Time.t
cycle_start ()
returns the result ofTime.now ()
called at the beginning of cycle.
val cycle_start_ns : unit -> Async_kernel__.Import.Time_ns.t
val cycle_times : unit -> Core_kernel.Time.Span.t Async_kernel__.Async_stream.t
cycle_times ()
returns a stream that is extended with an element at the start of each Async cycle, with the amount of time that the previous cycle took, as determined by calls toTime.now
at the beginning and end of the cycle.
val cycle_times_ns : unit -> Async_kernel__.Import.Time_ns.Span.t Async_kernel__.Async_stream.t
val long_cycles : at_least:Async_kernel__.Import.Time_ns.Span.t -> Async_kernel__.Import.Time_ns.Span.t Async_kernel__.Async_stream.t
long_cycles ~at_least
returns a stream of cycles whose duration is at leastat_least
.long_cycles
is more efficient thancycle_times
because it only allocates a stream entry when there is a long cycle, rather than on every cycle.
val cycle_count : unit -> int
cycle_count ()
returns the total number of Async cycles that have happened.
val total_cycle_time : unit -> Async_kernel__.Import.Time_ns.Span.t
total_cycle_time ()
returns the total (wall) time spent executing jobs in Async cycles.
val event_precision : unit -> Core_kernel.Time.Span.t
The
alarm_precision
of the timing-wheel used to implement Async'sClock
.
val event_precision_ns : unit -> Async_kernel__.Import.Time_ns.Span.t
val force_current_cycle_to_end : unit -> unit
force_current_cycle_to_end ()
causes no more normal priority jobs to run in the current cycle, and for the end-of-cycle jobs (i.e., writes) to run, and then for the cycle to end.
val set_max_num_jobs_per_priority_per_cycle : int -> unit
set_max_num_jobs_per_priority_per_cycle int
sets the maximum number of jobs that will be done at each priority within each Async cycle. The default is500
.max_num_jobs_per_priority_per_cycle
retrieves the current value.
val max_num_jobs_per_priority_per_cycle : unit -> int
val set_record_backtraces : bool -> unit
set_record_backtraces do_record
sets whether Async should keep in the execution context the history of stack backtraces (obtained viaBacktrace.get
) that led to the current job. If an Async job has an unhandled exception, this backtrace history will be recorded in the exception. In particular the history will appear in an unhandled exception that reaches the main monitor. This can have a substantial performance impact, both in running time and space usage.
val yield : unit -> unit Async_kernel.Deferred.t
yield ()
returns a deferred that becomes determined after the current cycle completes. This can be useful to improve fairness byyield
ing within a computation to give other jobs a chance to run.
val yield_until_no_jobs_remain : unit -> unit Async_kernel.Deferred.t
yield_until_no_jobs_remain ()
returns a deferred that becomes determined the next time Async's job queue is empty. This is useful in tests when one needs to wait for the completion of all the jobs based on what's in the queue, when those jobs might create other jobs -- without depending on I/O or the passage of wall-clock time.
val yield_every : n:int -> (unit -> unit Async_kernel.Deferred.t) Core_kernel.Staged.t
yield_every ~n
returns a function that will act asyield
everyn
calls and asreturn ()
the rest of the time. This is useful for improving fairness in circumstances where you don't have good control of the batch size, but can insert a deferred into every iteration.yield_every
raises ifn <= 0
.
val num_jobs_run : unit -> int
num_jobs_run ()
returns the number of jobs that have been run since starting. The returned value includes the currently running job.
val num_pending_jobs : unit -> int
num_pending_jobs
returns the number of jobs that are queued to run by the scheduler.
module Expert = Async_kernel.Async_kernel_scheduler.Expert
module Private = Async_kernel__.Scheduler
type t
=
{
mutex : Nano_mutex.t;
mutable is_running : bool;
mutable have_called_go : bool;
fds_whose_watching_has_changed : Fd.t Core.Stack.t;
file_descr_watcher : File_descr_watcher.t;
mutable time_spent_waiting_for_io : Tsc.Span.t;
fd_by_descr : Async_unix__.Fd_by_descr.t;
mutable timerfd : Core.Linux_ext.Timerfd.t option;
mutable timerfd_set_at : Core.Time_ns.t;
mutable scheduler_thread_id : int;
interruptor : Async_unix__.Interruptor.t;
signal_manager : Async_unix__.Raw_signal_manager.t;
thread_pool : Thread_pool.t;
mutable handle_thread_pool_stuck : Thread_pool.t -> stuck_for:Core.Time_ns.Span.t -> unit;
busy_pollers : Async_unix__.Busy_pollers.t;
mutable busy_poll_thread_is_running : bool;
mutable next_tsc_calibration : Tsc.t;
kernel_scheduler : Async_unix__.Import.Kernel_scheduler.t;
mutable have_lock_do_cycle : (unit -> unit) option;
mutable max_inter_cycle_timeout : Async_unix__.Import.Max_inter_cycle_timeout.t;
mutable min_inter_cycle_timeout : Async_unix__.Import.Min_inter_cycle_timeout.t;
initialized_at : Core.Backtrace.t;
}
val initialized_at : t -> Core.Backtrace.t
val min_inter_cycle_timeout : t -> Async_unix__.Import.Min_inter_cycle_timeout.t
val set_min_inter_cycle_timeout : t -> Async_unix__.Import.Min_inter_cycle_timeout.t -> unit
val max_inter_cycle_timeout : t -> Async_unix__.Import.Max_inter_cycle_timeout.t
val set_max_inter_cycle_timeout : t -> Async_unix__.Import.Max_inter_cycle_timeout.t -> unit
val have_lock_do_cycle : t -> (unit -> unit) option
val set_have_lock_do_cycle : t -> (unit -> unit) option -> unit
val kernel_scheduler : t -> Async_unix__.Import.Kernel_scheduler.t
val next_tsc_calibration : t -> Tsc.t
val set_next_tsc_calibration : t -> Tsc.t -> unit
val busy_poll_thread_is_running : t -> bool
val set_busy_poll_thread_is_running : t -> bool -> unit
val busy_pollers : t -> Async_unix__.Busy_pollers.t
val handle_thread_pool_stuck : t -> Thread_pool.t -> stuck_for:Core.Time_ns.Span.t -> unit
val set_handle_thread_pool_stuck : t -> (Thread_pool.t -> stuck_for:Core.Time_ns.Span.t -> unit) -> unit
val thread_pool : t -> Thread_pool.t
val signal_manager : t -> Async_unix__.Raw_signal_manager.t
val interruptor : t -> Async_unix__.Interruptor.t
val scheduler_thread_id : t -> int
val set_scheduler_thread_id : t -> int -> unit
val timerfd_set_at : t -> Core.Time_ns.t
val set_timerfd_set_at : t -> Core.Time_ns.t -> unit
val timerfd : t -> Core.Linux_ext.Timerfd.t option
val set_timerfd : t -> Core.Linux_ext.Timerfd.t option -> unit
val fd_by_descr : t -> Async_unix__.Fd_by_descr.t
val time_spent_waiting_for_io : t -> Tsc.Span.t
val set_time_spent_waiting_for_io : t -> Tsc.Span.t -> unit
val file_descr_watcher : t -> File_descr_watcher.t
val fds_whose_watching_has_changed : t -> Fd.t Core.Stack.t
val have_called_go : t -> bool
val set_have_called_go : t -> bool -> unit
val is_running : t -> bool
val set_is_running : t -> bool -> unit
val mutex : t -> Nano_mutex.t
module Fields : sig ... end
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
val max_num_threads : t -> int
val max_num_open_file_descrs : t -> int
val current_execution_context : t -> Async_kernel.Execution_context.t
val with_execution_context : t -> Async_kernel.Execution_context.t -> f:(unit -> 'a) -> 'a
val create_fd : ?avoid_nonblock_if_possible:bool -> t -> Fd.Kind.t -> Fd.File_descr.t -> Core.Info.t -> Fd.t
val thread_pool_cpu_affinity : t -> Thread_pool.Cpu_affinity.t
val lock : t -> unit
val try_lock : t -> bool
val unlock : t -> unit
val with_lock : t -> (unit -> 'a) -> 'a
val am_holding_lock : t -> bool
type the_one_and_only
=
|
Not_ready_to_initialize
|
Ready_to_initialize of unit -> t
|
Initialized of t
val mutex_for_initializing_the_one_and_only_ref : Nano_mutex.t
val the_one_and_only_ref : the_one_and_only Core.ref
val is_ready_to_initialize : unit -> bool
val the_one_and_only_uncommon_case : should_lock:bool -> t
val the_one_and_only : should_lock:bool -> t
val current_thread_id : unit -> int
val is_main_thread : unit -> bool
val remove_fd : t -> Async_unix__.Raw_fd.t -> unit
val maybe_start_closing_fd : t -> Fd.t -> unit
val dec_num_active_syscalls_fd : t -> Fd.t -> unit
val invariant : t -> unit
val update_check_access : t -> bool -> unit
val try_create_timerfd : unit -> Timerfd.t option
val default_handle_thread_pool_stuck : Thread_pool.t -> stuck_for:Core.Time_ns.Span.t -> Core_kernel__.Import.unit
val detect_stuck_thread_pool : t -> unit
val thread_safe_wakeup_scheduler : t -> unit
val i_am_the_scheduler : t -> bool
val set_fd_desired_watching : t -> Fd.t -> Async_unix__.Read_write.Key.t -> Fd.Watching.t -> unit
val request_start_watching : t -> Fd.t -> Async_unix__.Read_write.Key.t -> Fd.Watching.t -> [> `Already_closed | `Already_watching | `Unsupported | `Watching ]
val request_stop_watching : t -> Fd.t -> Async_unix__.Read_write.Key.t -> Fd.ready_to_result -> unit
val post_check_got_timerfd : Async_unix__.Import.File_descr.t -> 'a
val post_check_invalid_fd : Async_unix__.Import.File_descr.t -> 'a
val post_check_handle_fd : t -> Async_unix__.Import.File_descr.t -> Async_unix__.Read_write.Key.t -> Fd.ready_to_result -> unit
val create : ?thread_pool_cpu_affinity:Thread_pool.Cpu_affinity.t -> ?file_descr_watcher:Async_unix__.Config.File_descr_watcher.t -> ?max_num_open_file_descrs:Async_unix__.Config.Max_num_open_file_descrs.t -> ?max_num_threads:Async_unix__.Config.Max_num_threads.t -> unit -> t
val init : unit -> unit
val reset_in_forked_process : unit -> unit
val thread_safe_reset : unit -> unit
val make_async_unusable : unit -> unit
val thread_safe_enqueue_external_job : t -> Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit
val have_lock_do_cycle : t -> unit
val log_sync_changed_fds_to_file_descr_watcher : t -> Async_unix__.Import.File_descr.t -> Core_kernel__.Import.bool Async_unix__.Read_write.t -> unit
val sync_changed_fd_failed : t -> Fd.t -> Core_kernel__.Import.bool Async_unix__.Read_write.t -> Core_kernel__.Import.Exn.t -> 'a
val sync_changed_fds_to_file_descr_watcher : t -> unit
val maybe_calibrate_tsc : t -> unit
val create_job : ?execution_context:Async_kernel.Execution_context.t -> t -> ('a -> unit) -> 'a -> Async_kernel__.Job.t
val dump_core_on_job_delay : unit -> unit
val init : t -> unit
val check_file_descr_watcher : t -> timeout:'a Async_unix__.File_descr_watcher_intf.Timeout.t -> 'a -> unit
val compute_timeout_and_check_file_descr_watcher : t -> unit
val one_iter : t -> unit
val be_the_scheduler : ?raise_unhandled_exn:bool -> t -> 'a
val add_finalizer : t -> 'a Core_kernel.Heap_block.t -> ('a Core_kernel.Heap_block.t -> unit) -> unit
val add_finalizer_exn : t -> 'a -> ('a -> unit) -> unit
val set_task_id : unit -> unit
val go : ?raise_unhandled_exn:bool -> unit -> Core__.Import.never_returns
val go_main : ?raise_unhandled_exn:bool -> ?file_descr_watcher:Async_unix__.Config.File_descr_watcher.t -> ?max_num_open_file_descrs:int -> ?max_num_threads:int -> main:(unit -> unit) -> unit -> Core__.Import.never_returns
val is_running : unit -> bool
val report_long_cycle_times : ?cutoff:Core.Time.Span.t -> unit -> unit
val set_check_invariants : bool -> unit
val set_detect_invalid_access_from_thread : bool -> unit
val set_max_inter_cycle_timeout : Core_kernel__.Span_float.t -> unit
val start_busy_poller_thread_if_not_running : t -> unit
val add_busy_poller : (unit -> [ `Continue_polling | `Stop_polling of 'a ]) -> 'a Async_unix__.Import.Deferred.t
type 'b folder
=
{
folder : a. 'b -> t -> (t, 'a) Core.Field.t -> 'b;
}
val t : unit -> t
val fold_fields : init:'a -> 'a folder -> 'a
val handle_thread_pool_stuck : (stuck_for:Core.Time_ns.Span.t -> unit) -> unit