An Async extension of Core_kernel.Bus. Functions that share the same name and types as those in Core_kernel.Bus are direct calls to the same.
include module type of sig ... end
val sexp_of_t : ('callback ‑> Sexplib.Sexp.t) ‑> ('phantom ‑> Sexplib.Sexp.t) ‑> ('callback, 'phantom) t ‑> Sexplib.Sexp.t
module Read_write = Core_kernel__Bus.Read_write
module Read_only = Core_kernel__Bus.Read_only
val read_only : ('callback, 'a) t ‑> 'callback Read_only.t
val create : ?name:Core_kernel.Info.t ‑> Core_kernel.Source_code_position.t ‑> 'callback Callback_arity.t ‑> on_subscription_after_first_write:On_subscription_after_first_write.t ‑> on_callback_raise:(Core_kernel.Error.t ‑> Core_kernel__.Import.unit) ‑> 'callback Read_write.t
val callback_arity : ('callback, 'a) t ‑> 'callback Callback_arity.t
val num_subscribers : ('a, 'b) t ‑> Core_kernel__.Import.int
val is_closed : ('a, 'b) t ‑> Core_kernel__.Import.bool
val close : 'callback Read_write.t ‑> Core_kernel__.Import.unit
val write : ('a ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> Core_kernel__.Import.unit
val write2 : ('a ‑> 'b ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> Core_kernel__.Import.unit
val write3 : ('a ‑> 'b ‑> 'c ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> 'c ‑> Core_kernel__.Import.unit
val write4 : ('a ‑> 'b ‑> 'c ‑> 'd ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> 'c ‑> 'd ‑> Core_kernel__.Import.unit
module Subscriber = Core_kernel__Bus.Subscriber
val subscribe_exn : ?on_callback_raise:(Core_kernel.Error.t ‑> Core_kernel__.Import.unit) ‑> 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> f:'callback ‑> 'callback Subscriber.t
val iter_exn : 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> f:'callback ‑> Core_kernel__.Import.unit
module Fold_arity = Core_kernel__Bus.Fold_arity
val fold_exn : 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> ('callback, 'f, 's) Fold_arity.t ‑> init:'s ‑> f:'f ‑> Core_kernel__.Import.unit
val unsubscribe : 'callback Read_only.t ‑> 'callback Subscriber.t ‑> Core_kernel__.Import.unit
val pipe1_exn : ('a ‑> unit) Read_only.t ‑> Core.Source_code_position.t ‑> 'a Async_extra__.Import.Pipe.Reader.t
pipe1_exn t
returns a pipe of updates from t
. If the pipe is closed you
will be unsubscribed. pipe1_exn
raises in the same circumstances as
subscribe_exn
.
module First_arity : sig ... end
val first_exn : ?stop:unit Async_extra__.Import.Deferred.t ‑> 'c Read_only.t ‑> Core.Source_code_position.t ‑> ('c, 'f, 'r) First_arity.t ‑> f:'f ‑> 'r Async_extra__.Import.Deferred.t
first_exn here t arity ~f
returns a deferred that becomes determined with value r
when the first event is published to t
where f
returns Some r
. first_exn
then
unsubscribes from t
, ensuring that f
is never called again after it returns
Some
. first_exn
raises if it can't subscribe to the bus, i.e. if subscribe_exn
raises. If f
raises, then first_exn
raises to the monitor in effect when
first_exn
was called. first_exn
takes time proportional to the number of bus
subscribers.
If stop
is provided and becomes determined, f
will not be called again, it will
unsubscribe from the bus, and the deferred that was returned by first_exn
will never
become determined.