Module Async_extra.Bus

An Async extension of Core_kernel.Bus. Functions that share the same name and types as those in Core_kernel.Bus are direct calls to same.

include module type of sig ... end
type ('callback, 'phantom) t = ('callback'phantomCore_kernel__Bus.t
val sexp_of_t : ('callback ‑> Base.Sexp.t) ‑> ('phantom ‑> Base.Sexp.t) ‑> ('callback'phantomt ‑> Base.Sexp.t
type ('callback, 'phantom) bus = ('callback'phantomt
val read_only : ('callback'at ‑> 'callback Read_only.t
val create : ?⁠name:Core_kernel.Info.t ‑> Core_kernel.Source_code_position.t ‑> 'callback Callback_arity.t ‑> on_subscription_after_first_write:On_subscription_after_first_write.t ‑> on_callback_raise:(Core_kernel.Error.t ‑> Core_kernel__.Import.unit) ‑> 'callback Read_write.t
val callback_arity : ('callback'at ‑> 'callback Callback_arity.t
val num_subscribers : ('a'bt ‑> Core_kernel__.Import.int
val is_closed : ('a'bt ‑> Core_kernel__.Import.bool
val close : 'callback Read_write.t ‑> Core_kernel__.Import.unit
val write2 : ('a ‑> 'b ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> Core_kernel__.Import.unit
val write3 : ('a ‑> 'b ‑> 'c ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> 'c ‑> Core_kernel__.Import.unit
val write4 : ('a ‑> 'b ‑> 'c ‑> 'd ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> 'c ‑> 'd ‑> Core_kernel__.Import.unit
val subscribe_exn : ?⁠extract_exn:Core_kernel__.Import.bool ‑> ?⁠on_callback_raise:(Core_kernel.Error.t ‑> Core_kernel__.Import.unit) ‑> ?⁠on_close:(Core_kernel__.Import.unit ‑> Core_kernel__.Import.unit) ‑> 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> f:'callback ‑> 'callback Subscriber.t
val iter_exn : 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> f:'callback ‑> Core_kernel__.Import.unit
val fold_exn : 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> ('callback'f'sFold_arity.t ‑> init:'s ‑> f:'f ‑> Core_kernel__.Import.unit
val unsubscribe : 'callback Read_only.t ‑> 'callback Subscriber.t ‑> Core_kernel__.Import.unit
val pipe1_exn : ('a ‑> unit) Read_only.t ‑> Core.Source_code_position.t ‑> 'a Async_extra__.Import.Pipe.Reader.t

pipe1_exn t returns a pipe of updates from t by subscribing to t. Closing the pipe unsubscribes from t. Closing t closes the pipe. pipe1_exn raises in the same circumstances as subscribe_exn.

module First_arity : sig ... end
val first_exn : ?⁠stop:unit Async_extra__.Import.Deferred.t ‑> 'c Read_only.t ‑> Core.Source_code_position.t ‑> ('c'f'rFirst_arity.t ‑> f:'f ‑> 'r Async_extra__.Import.Deferred.t

first_exn here t arity ~f returns a deferred that becomes determined with value r when the first event is published to t where f returns Some r. first_exn then unsubscribes from t, ensuring that f is never called again after it returns Some. first_exn raises if it can't subscribe to the bus, i.e., if subscribe_exn raises. If f raises, then first_exn raises to the monitor in effect when first_exn was called. first_exn takes time proportional to the number of bus subscribers.

If stop is provided and becomes determined, f will not be called again, it will unsubscribe from the bus, and the deferred that was returned by first_exn will never become determined.