Module Async_unix__.Scheduler
type t= Async_unix__.Raw_scheduler.t
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
val t : unit -> tt ()returns the Async scheduler. If the scheduler hasn't been created yet, this will create it and acquire the Async lock.
val max_num_open_file_descrs : unit -> intval max_num_threads : unit -> intval go : ?raise_unhandled_exn:bool -> unit -> Core.never_returnsgo ?raise_unhandled_exn ()passes control to Async, at which point Async starts running handlers, one by one without interruption, until there are no more handlers to run. When Async is out of handlers, it blocks until the outside world schedules more of them. Because of this, Async programs do not exit untilshutdownis called.go ()callshandle_signal Sys.sigpipe, which causes the SIGPIPE signal to be ignored. Low-level syscalls (e.g., write) still raise EPIPE.If any Async job raises an unhandled exception that is not handled by any monitor, Async execution ceases. Then, by default, Async pretty prints the exception, and exits with status 1. If you don't want this, pass
~raise_unhandled_exn:true, which will cause the unhandled exception to be raised to the caller ofgo ().
val go_main : ?raise_unhandled_exn:bool -> ?file_descr_watcher:Async_unix__.Config.File_descr_watcher.t -> ?max_num_open_file_descrs:int -> ?max_num_threads:int -> main:(unit -> unit) -> unit -> Core.never_returnsgo_mainis likego, except that you supply amainfunction that will be run to initialize the Async computation, and thatgo_mainwill fail if any Async has been used prior togo_mainbeing called. Moreover it allows you to configure more static options of the scheduler.
type 'a with_options= ?monitor:Async_unix__.Import.Monitor.t -> ?priority:Async_unix__.Import.Priority.t -> 'a
val current_execution_context : unit -> Async_unix__.Import.Execution_context.tval within_context : Async_unix__.Import.Execution_context.t -> (unit -> 'a) -> ('a, unit) Core.Result.twithin_context context frunsf ()right now with the specified execution context. Iffraises, then the exception is sent to the monitor ofcontext, andError ()is returned.
val within' : ((unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t) with_optionswithin' f ~monitor ~priorityrunsf ()right now, with the specified block group, monitor, and priority set as specified. They will be reset to their original values whenfreturns. Iffraises, then the result ofwithin'will never become determined, but the exception will end up in the specified monitor.
val within : ((unit -> unit) -> unit) with_optionswithinis likewithin', but doesn't require the thunk to return a deferred.
val within_v : ((unit -> 'a) -> 'a option) with_optionswithin_vis likewithin, but allows a value to be returned byf.
val with_local : 'a Core.Univ_map.Key.t -> 'a option -> f:(unit -> 'b) -> 'bwith_local key value ~f, when run in the current execution context,e, runsfright now in a new execution context,e', that is identical toeexcept thatfind_local key = value. As usual,e'will be in effect in asynchronous computations started byf. Whenwith_localreturns, the execution context is restored toe.
val find_local : 'a Core.Univ_map.Key.t -> 'a optionfind_local keyreturns the value associated tokeyin the current execution context.
val schedule' : ((unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t) with_optionsJust like
within', but instead of running the thunk right now, adds it to the Async queue to be run with other Async jobs.
val schedule : ((unit -> unit) -> unit) with_optionsJust like
schedule', but doesn't require the thunk to return a deferred.
val preserve_execution_context : ('a -> unit) -> ('a -> unit) Core.Staged.tpreserve_execution_context t fsaves the current execution context and returns a functiongsuch thatg arunsf ain the saved execution context.g abecomes determined whenf abecomes determined.
val preserve_execution_context' : ('a -> 'b Async_unix__.Import.Deferred.t) -> ('a -> 'b Async_unix__.Import.Deferred.t) Core.Staged.tval cycle_start : unit -> Core.Time.tcycle_start ()returns the result ofTime.now ()called at the beginning of cycle.
val cycle_start_ns : unit -> Core.Time_ns.tval cycle_times : unit -> Core.Time.Span.t Async_unix__.Import.Stream.tcycle_times ()returns a stream that is extended with an element at the start of each Async cycle, with the amount of time that the previous cycle took, as determined by calls toTime.nowat the beginning and end of the cycle.
val cycle_times_ns : unit -> Core.Time_ns.Span.t Async_unix__.Import.Stream.tval long_cycles : at_least:Core.Time_ns.Span.t -> Core.Time_ns.Span.t Async_unix__.Import.Stream.tlong_cycles ~at_leastreturns a stream of cycles whose duration is at leastat_least.long_cyclesis more efficient thancycle_timesbecause it only allocates a stream entry when there is a long cycle, rather than on every cycle.
val report_long_cycle_times : ?cutoff:Core.Time.Span.t -> unit -> unitreport_long_cycle_times ?cutoff ()sets up something that will print a warning to stderr whenever there is an Async cycle that is too long, as specified bycutoff, whose default is 1s.
val cycle_count : unit -> intcycle_count ()returns the total number of Async cycles that have happened.
val total_cycle_time : unit -> Core.Time_ns.Span.ttotal_cycle_time ()returns the total (wall) time spent executing jobs in Async cycles.
val event_precision : unit -> Core.Time.Span.tThe
alarm_precisionof the timing-wheel used to implement Async'sClock.
val event_precision_ns : unit -> Core.Time_ns.Span.tval force_current_cycle_to_end : unit -> unitforce_current_cycle_to_end ()causes no more normal priority jobs to run in the current cycle, and for the end-of-cycle jobs (i.e., writes) to run, and then for the cycle to end.
val set_max_num_jobs_per_priority_per_cycle : int -> unitset_max_num_jobs_per_priority_per_cycle intsets the maximum number of jobs that will be done at each priority within each Async cycle. The default is500.max_num_jobs_per_priority_per_cycleretrieves the current value.
val max_num_jobs_per_priority_per_cycle : unit -> intval set_max_inter_cycle_timeout : Core.Time.Span.t -> unitset_max_inter_cycle_timeout spansets the maximum amount of time the scheduler will remain blocked (on epoll or select) between cycles.
val set_check_invariants : bool -> unitset_check_invariants do_checksets whether Async should check invariants of its internal data structures.set_check_invariants truecan substantially slow down your program.
val set_detect_invalid_access_from_thread : bool -> unitset_detect_invalid_access_from_thread do_checksets whether Async routines should check if they are being accessed from some thread other than the thread currently holding the Async lock, which is not allowed and can lead to very confusing behavior.
val set_record_backtraces : bool -> unitset_record_backtraces do_recordsets whether Async should keep in the execution context the history of stack backtraces (obtained viaBacktrace.get) that led to the current job. If an Async job has an unhandled exception, this backtrace history will be recorded in the exception. In particular the history will appear in an unhandled exception that reaches the main monitor. This can have a substantial performance impact, both in running time and space usage.
type 'b folder={folder : a. 'b -> t -> (t, 'a) Core.Field.t -> 'b;}
val fold_fields : init:'b -> 'b folder -> 'bfold_fields ~init folderfoldsfolderover each field in the scheduler. The fields themselves are not exposed --foldermust be a polymorphic function that can work on any field. So, it's only useful for generic operations, e.g., getting the size of each field.
val is_ready_to_initialize : unit -> boolval reset_in_forked_process : unit -> unitIf a process that has already created, but not started, the Async scheduler would like to fork, and would like the child to have a clean Async, i.e., not inherit any of the Async work that was done in the parent, it can call
reset_in_forked_processat the start of execution in the child process. After that, the child can do Async stuff and then start the Async scheduler.
val make_async_unusable : unit -> unitmake_async_unusable ()makes subsequent attempts to use the Async scheduler raise. One use case formake_async_unusableis if you fork from a process already running the Async scheduler, and want to run non-Async OCaml code in the child process, with the guarantee that the child process does not use Async.
val add_busy_poller : (unit -> [ `Continue_polling | `Stop_polling of 'a ]) -> 'a Async_unix__.Import.Deferred.tAsync supports "busy polling", which runs a thread that busy loops running user-supplied polling functions. The busy-loop thread is distinct from Async's scheduler thread.
Busy polling is useful for a situation like a shared-memory ringbuffer being used for IPC. One can poll the ringbuffer with a busy poller, and then when data is detected, fill some ivar that causes Async code to handle the data.
add_busy_poller polladdspollto the busy loop.pollwill be called continuously, once per iteration of the busy loop, until it returns`Stop_polling aat which point the result ofadd_busy_pollerwill become determined.pollwill hold the Async lock while running, so it is fine to do ordinary Async operations, e.g., fill an ivar. The busy loop will run an ordinary Async cycle if any of the pollers add jobs.pollwill run in the monitor in effect whenadd_busy_pollerwas called; exceptions raised bypollwill be sent asynchronously to that monitor. Ifpollraises, it will still be run on subsequent iterations of the busy loop.
val handle_thread_pool_stuck : (stuck_for:Core.Time_ns.Span.t -> unit) -> unithandle_thread_pool_stuck fcausesfto run whenever Async detects its thread pool is stuck (i.e., hasn't completed a job for over a second and has work waiting to start). Async checks every second. By default, if the thread pool has been stuck for less than 60s, Async willeprintfa message. If more than 60s, Async will send an exception to the main monitor, which will abort the program unless there is a custom handler for the main monitor.Calling
handle_thread_pool_stuckreplaces whatever behavior was previously there.
val default_handle_thread_pool_stuck : Thread_pool.t -> stuck_for:Core.Time_ns.Span.t -> unitval yield : unit -> unit Async_unix__.Import.Deferred.tyield ()returns a deferred that becomes determined after the current cycle completes. This can be useful to improve fairness byyielding within a computation to give other jobs a chance to run.
val yield_until_no_jobs_remain : unit -> unit Async_unix__.Import.Deferred.tyield_until_no_jobs_remain ()returns a deferred that becomes determined the next time Async's job queue is empty. This is useful in tests when one needs to wait for the completion of all the jobs based on what's in the queue, when those jobs might create other jobs -- without depending on I/O or the passage of wall-clock time.
val yield_every : n:int -> (unit -> unit Async_unix__.Import.Deferred.t) Core.Staged.tyield_every ~nreturns a function that will act asyieldeveryncalls and asreturn ()the rest of the time. This is useful for improving fairness in circumstances where you don't have good control of the batch size, but can insert a deferred into every iteration.yield_everyraises ifn <= 0.
val time_spent_waiting_for_io : unit -> Core.Time_ns.Span.ttime_spent_waiting_for_io ()returns the amount of time that the Async scheduler has spent in calls toepoll_wait(orselect) since the start of the program.
val set_min_inter_cycle_timeout : Core.Time_ns.Span.t -> unitset_min_inter_cycle_timeoutsets the minimum timeout that the scheduler will pass to the OS when it checks for I/O between cycles. The minimum is zero by default. Setting it to a nonzero value is used to increase thread fairness between the scheduler and other threads. A plausible setting is 10us. This can also be set via theASYNC_CONFIGenvironment variable.
val num_jobs_run : unit -> intnum_jobs_run ()returns the number of jobs that have been run since starting. The returned value includes the currently running job.
val num_pending_jobs : unit -> intnum_pending_jobsreturns the number of jobs that are queued to run by the scheduler.
module Expert : sig ... end