Module Async_extra.Schedule_v5

Extends Core.Schedule_v5 with functions for asynchronously handling events in the schedule.

include module type of sig ... end
type zoned = Core__Schedule_v5.zoned =
| Zoned
val compare_zoned : zoned ‑> zoned ‑> int
type unzoned = Core__Schedule_v5.unzoned =
| Unzoned
val compare_unzoned : unzoned ‑> unzoned ‑> int
type ('a, 'b) t = ('a'bCore__Schedule_v5.t =
| In_zone : Core__.Import_time.Time.Zone.t * (unzoned'b0t ‑> (zoned'b0t
| Tag : 'b1 * ('a0'b1t ‑> ('a0'b1t
| And : ('a1'b2t list ‑> ('a1'b2t
| Or : ('a2'b3t list ‑> ('a2'b3t
| Not : ('a3'b4t ‑> ('a3'b4t
| If_then_else : (('a4'b5t * ('a4'b5t * ('a4'b5t) ‑> ('a4'b5t
| Shift : Core__.Import_time.Time.Span.t * ('a5'b6t ‑> ('a5'b6t
| Between : (Inclusive_exclusive.t * Core__.Import_time.Time.Ofday.t) * (Inclusive_exclusive.t * Core__.Import_time.Time.Ofday.t) ‑> (unzoned'b7t
| Zoned_between : (Inclusive_exclusive.t * Core__.Import_time.Time.Zone.t * Core__.Import_time.Time.Ofday.t) * (Inclusive_exclusive.t * Core__.Import_time.Time.Zone.t * Core__.Import_time.Time.Ofday.t) ‑> (zoned'b8t
| At : Core__.Import_time.Time.Ofday.t list ‑> (unzoned'b9t
| Secs : int list ‑> (unzoned'b10t
| Mins : int list ‑> (unzoned'b11t
| Hours : int list ‑> (unzoned'b12t
| Weekdays : Core__.Import.Day_of_week.t list ‑> (unzoned'b13t
| Days : int list ‑> (unzoned'b14t
| Weeks : int list ‑> (unzoned'b15t
| Months : Core__.Import.Month.t list ‑> (unzoned'b16t
| On : Core__.Import.Date.t list ‑> (unzoned'b17t
| Before : (Inclusive_exclusive.t * (Core__.Import.Date.t * Core__.Import_time.Time.Ofday.t)) ‑> (unzoned'b18t
| After : (Inclusive_exclusive.t * (Core__.Import.Date.t * Core__.Import_time.Time.Ofday.t)) ‑> (unzoned'b19t
| Always : ('a6'b20t
| Never : ('a7'b21t
val compare : ('a ‑> 'a ‑> int) ‑> ('b ‑> 'b ‑> int) ‑> ('a'bt ‑> ('a'bt ‑> int
type 'b zoned_t = (zoned'bt
val sexp_of_zoned_t : ('b ‑> Base.Sexp.t) ‑> 'b zoned_t ‑> Base.Sexp.t
val to_string_zoned : (zoned'bt ‑> string_of_tag:('b ‑> string) ‑> string
val includes : (zoned'bt ‑> Core__.Import_time.Time.t ‑> bool
val tags : (zoned'tagt ‑> Core__.Import_time.Time.t ‑> [ `Included of 'tag list | `Not_included ]
val all_tags : (zoned'tagt ‑> tag_comparator:('tag'cmpCore__.Import.Comparator.t ‑> ('tag'cmpCore__.Import.Set.t
val fold_tags : (zoned'tagt ‑> init:'m ‑> f:('m ‑> 'tag ‑> 'm) ‑> Core__.Import_time.Time.t ‑> 'm option
val map_tags : ('a'bt ‑> f:('b ‑> 'c) ‑> ('a'ct
type ('tag, 'a) emit = ('tag'aCore__Schedule_v5.emit =
| Transitions : ('tag0, [ `Enter of Core__.Import_time.Time.t * 'tag0 list | `Leave of Core__.Import_time.Time.t | `No_change_until_at_least of [ `In_range | `Out_of_range ] * Core__.Import_time.Time.t ]) emit
| Transitions_and_tag_changes : ('tag1 ‑> 'tag1 ‑> bool) ‑> ('tag1, [ `Change_tags of Core__.Import_time.Time.t * 'tag1 list | `Enter of Core__.Import_time.Time.t * 'tag1 list | `Leave of Core__.Import_time.Time.t | `No_change_until_at_least of [ `In_range | `Out_of_range ] * Core__.Import_time.Time.t ]) emit
val to_endless_sequence : (zoned'tagt ‑> start_time:Core__.Import_time.Time.t ‑> emit:('tag'aemit ‑> [ `Started_in_range of 'tag list * 'a Core__.Import.Sequence.t | `Started_out_of_range of 'a Core__.Import.Sequence.t ]
val next_enter_between : (zoned'tagt ‑> Core__.Import_time.Time.t ‑> Core__.Import_time.Time.t ‑> Core__.Import_time.Time.t option
val next_leave_between : (zoned'tagt ‑> Core__.Import_time.Time.t ‑> Core__.Import_time.Time.t ‑> Core__.Import_time.Time.t option
type ('tag, 'output) pipe_emit =
| Transitions : ('tag'tag Event.transitionpipe_emit
| Transitions_and_tag_changes : ('tag ‑> 'tag ‑> bool) ‑> ('tag, [ 'tag Event.transition | 'tag Event.tag_change ]) pipe_emit

In Transitions_and_tag_changes, equality for the tag type must be given.

val to_pipe : (zoned'tagt ‑> start_time:Core.Time.t ‑> emit:('tag'outputpipe_emit ‑> ?⁠time_source:Async_extra__.Import.Time_source.t ‑> unit ‑> [ `Started_in_range of 'tag list * 'output Async_extra__.Import.Pipe.Reader.t | `Started_out_of_range of 'output Async_extra__.Import.Pipe.Reader.t ]

to_pipe t ~start_time ~emit ?stop () produces a pipe containing the events from to_endless_sequence ~start_time t ~emit, with `No_change_until_at_least filtered out and each event added only at or after its scheduled time.

val next_event : (zoned'tagt ‑> event:[ `Enter | `Leave ] ‑> stop:unit Async_extra__.Import.Deferred.t ‑> ?⁠time_source:Async_extra__.Import.Time_source.t ‑> ?⁠after:Core.Time.t ‑> unit ‑> Core.Time.t Async_extra__.Import.Deferred.t

next_event t ~event ~stop ~after () waits for the time of the next event matching event in t starting at time after. At that time, the resulting deferred is determined and filled with the time of the event.

If stop becomes determined before the next event, the resulting deferred is never filled and the computation to find the next event stops. If the caller intends to never use the returned deferred, stop should be filled or the background computation will continue to keep the deferred alive until the event occurs.

This function is a good choice for handling a single event during the run of a program, like scheduling shutdown. If the intention is to follow along with all events in a schedule, it is preferable to call to_pipe or to_endless_sequence (in the non-async module).

type 'a every_enter_callback = enter:Core.Time.t ‑> leave:Core.Time.t Async_extra__.Import.Deferred.t ‑> 'a
val every_enter_without_pushback : (zoned_t ‑> ?⁠time_source:Async_extra__.Import.Time_source.t ‑> ?⁠start:Core.Time.t ‑> ?⁠stop:unit Async_extra__.Import.Deferred.t ‑> ?⁠continue_on_error:bool ‑> ?⁠start_in_range_is_enter:bool ‑> unit every_enter_callback ‑> unit

every_enter_without_pushback t ~start ~stop ~continue_on_error ~start_in_range_is_enter f calls f for each contiguous block of time in t starting at start and continuing until stop becomes determined.

For each block of time with start time enter and end time leave_time, f is called with f ~enter ~leave, where leave is a deferred that becomes determined at leave_time with the value leave_time.

If includes t start && start_in_range_is_enter, then f will be called as soon as possible after start with enter = start. Otherwise, f will not be called for any block of time that includes start.

If continue_on_error = false and f (or any async job started by f) raises an error, f will no longer be called, and all undetermined leave deferreds will remain unfulfilled.

If stop is fulfilled then no further calls to f will be made and all undetermined leave deferreds will remain unfulfilled.

val every_enter : (zoned_t ‑> ?⁠time_source:Async_extra__.Import.Time_source.t ‑> ?⁠start:Core.Time.t ‑> ?⁠stop:unit Async_extra__.Import.Deferred.t ‑> ?⁠continue_on_error:bool ‑> ?⁠start_in_range_is_enter:bool ‑> ?⁠on_pushback:unit every_enter_callback ‑> unit Async_extra__.Import.Deferred.t every_enter_callback ‑> unit

Like every_enter_without_pushback, except allows at most one call of f to be in flight at a time. If the schedule would cause f to be invoked again before the prior call has finished, then it invokes on_pushback instead (if provided).

val every_tag_change_without_pushback : (zoned'tagt ‑> ?⁠time_source:Async_extra__.Import.Time_source.t ‑> ?⁠start:Core.Time.t ‑> ?⁠stop:unit Async_extra__.Import.Deferred.t ‑> ?⁠continue_on_error:bool ‑> ?⁠start_in_range_is_enter:bool ‑> tag_equal:('tag ‑> 'tag ‑> bool) ‑> (tags:'tag list ‑> unit every_enter_callback) ‑> unit

every_tag_change t f calls f for each contiguous block of time in t where the set of tags in effect remains stable (according to tag_equal).

Moving from a range where the schedule is not in effect to one where it is in effect with no tags is considered a tag change.

For each block of time tagged with tags and start time enter and end time leave_time, f is called with f ~tags ~enter ~leave, where leave is a deferred that becomes determined at leave_time with the value leave_time.

stop, continue_on_error, and start_in_range_is_enter act as documented in every_enter.

val every_tag_change : (zoned'tagt ‑> ?⁠time_source:Async_extra__.Import.Time_source.t ‑> ?⁠start:Core.Time.t ‑> ?⁠stop:unit Async_extra__.Import.Deferred.t ‑> ?⁠continue_on_error:bool ‑> ?⁠start_in_range_is_enter:bool ‑> ?⁠on_pushback:(tags:'tag list ‑> unit every_enter_callback) ‑> tag_equal:('tag ‑> 'tag ‑> bool) ‑> (tags:'tag list ‑> unit Async_extra__.Import.Deferred.t every_enter_callback) ‑> unit

Like every_tag_change_without_pushback, but pushes back in the same manner as every_enter.