Module Async_extra__.Bus
include module type of sig ... end
type ('callback, 'phantom) t
= ('callback, 'phantom) Core_kernel__Bus.t
val sexp_of_t : ('callback -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('callback, 'phantom) t -> Ppx_sexp_conv_lib.Sexp.t
type ('callback, 'phantom) bus
= ('callback, 'phantom) t
module Read_write = Core_kernel__Bus.Read_write
module Read_only = Core_kernel__Bus.Read_only
val read_only : ('callback, 'a) t -> 'callback Read_only.t
val create : ?name:Core_kernel.Info.t -> Core_kernel.Source_code_position.t -> 'callback Callback_arity.t -> on_subscription_after_first_write:On_subscription_after_first_write.t -> on_callback_raise:(Core_kernel.Error.t -> Core_kernel__.Import.unit) -> 'callback Read_write.t
val callback_arity : ('callback, 'a) t -> 'callback Callback_arity.t
val num_subscribers : ('a, 'b) t -> Core_kernel__.Import.int
val is_closed : ('a, 'b) t -> Core_kernel__.Import.bool
val close : 'callback Read_write.t -> Core_kernel__.Import.unit
val write : ('a -> Core_kernel__.Import.unit) Read_write.t -> 'a -> Core_kernel__.Import.unit
val write2 : ('a -> 'b -> Core_kernel__.Import.unit) Read_write.t -> 'a -> 'b -> Core_kernel__.Import.unit
val write3 : ('a -> 'b -> 'c -> Core_kernel__.Import.unit) Read_write.t -> 'a -> 'b -> 'c -> Core_kernel__.Import.unit
val write4 : ('a -> 'b -> 'c -> 'd -> Core_kernel__.Import.unit) Read_write.t -> 'a -> 'b -> 'c -> 'd -> Core_kernel__.Import.unit
module Subscriber = Core_kernel__Bus.Subscriber
val subscribe_exn : ?extract_exn:Core_kernel__.Import.bool -> ?on_callback_raise:(Core_kernel.Error.t -> Core_kernel__.Import.unit) -> ?on_close:(Core_kernel__.Import.unit -> Core_kernel__.Import.unit) -> 'callback Read_only.t -> Core_kernel.Source_code_position.t -> f:'callback -> 'callback Subscriber.t
val iter_exn : 'callback Read_only.t -> Core_kernel.Source_code_position.t -> f:'callback -> Core_kernel__.Import.unit
module Fold_arity = Core_kernel__Bus.Fold_arity
val fold_exn : 'callback Read_only.t -> Core_kernel.Source_code_position.t -> ('callback, 'f, 's) Fold_arity.t -> init:'s -> f:'f -> Core_kernel__.Import.unit
val unsubscribe : 'callback Read_only.t -> 'callback Subscriber.t -> Core_kernel__.Import.unit
val pipe1_exn : ('a -> unit) Read_only.t -> Core.Source_code_position.t -> 'a Async_extra__.Import.Pipe.Reader.t
pipe1_exn t
returns a pipe of updates fromt
by subscribing tot
. Closing the pipe unsubscribes fromt
. Closingt
closes the pipe.pipe1_exn
raises in the same circumstances assubscribe_exn
.
module First_arity : sig ... end
val first_exn : ?stop:unit Async_extra__.Import.Deferred.t -> 'c Read_only.t -> Core.Source_code_position.t -> ('c, 'f, 'r) First_arity.t -> f:'f -> 'r Async_extra__.Import.Deferred.t
first_exn here t arity ~f
returns a deferred that becomes determined with valuer
when the first event is published tot
wheref
returnsSome r
.first_exn
then unsubscribes fromt
, ensuring thatf
is never called again after it returnsSome
.first_exn
raises if it can't subscribe to the bus, i.e., ifsubscribe_exn
raises. Iff
raises, thenfirst_exn
raises to the monitor in effect whenfirst_exn
was called.first_exn
takes time proportional to the number of bus subscribers.If
stop
is provided and becomes determined,f
will not be called again, it will unsubscribe from the bus, and the deferred that was returned byfirst_exn
will never become determined.