Module Core_kernel.Bus
A Bus is a publisher/subscriber system within the memory space of the program. A bus has a mutable set of subscribers, which can be modified using subscribe_exn and unsubscribe.
create returns a Bus.Read_write.t, which you can use to write values to the bus. write calls the callbacks of all current subscribers before returning.
In a ('callback, 'phantom) Bus.t, 'phantom is a read-write phantom type that controls whether one can read values from or write values to the bus. The phantom type states the capabilities one could ever have access to, not the capabilities that are immediately available. In particular, if one wants to subscribe to a Bus.Read_write.t, one must call read_only on it in order to get a Bus.Read_only.t that can be passed to subscribe_exn. This is deliberate, and is meant to avoid unintentional reads from code that should only be writing.
- module Callback_arity : sig ... end
- Callback_aritystates the type of callbacks stored in a bus. Using- Callback_arityis an implementation technique that allows callbacks to be defined as ordinary n-ary curried functions (e.g.,- a1 -> a2 -> a3 -> r), instead of forcing n-ary-variadic callbacks to use tuples (e.g.,- a1 * a2 * a3 -> r). This also avoids extra allocation.
- val sexp_of_t : ('callback -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('callback, 'phantom) t -> Ppx_sexp_conv_lib.Sexp.t
- type ('callback, 'phantom) bus- = ('callback, 'phantom) t
module Read_write : sig ... endmodule Read_only : sig ... endmodule On_subscription_after_first_write : sig ... end- val read_only : ('callback, _) t -> 'callback Read_only.t
- val create : ?name:Info.t -> Source_code_position.t -> 'callback Callback_arity.t -> on_subscription_after_first_write:On_subscription_after_first_write.t -> on_callback_raise:(Error.t -> Core_kernel__.Import.unit) -> 'callback Read_write.t
- In - create [%here] ArityN ~on_subscription_after_first_write ~on_callback_raise,- [%here]is stored in the resulting bus, and contained in- %sexp_of: t, which can help with debugging.- If - on_subscription_after_first_writeis- Raise, then- subscribe_exnwill raise if it is called after- writehas been called the first time. If- on_subscription_after_first_writeis- Allow_and_send_last_value, then the bus will remember the last value written and will send it to new subscribers.- If a callback raises, - on_callback_raiseis called with an error containing the exception.- If - on_callback_raiseraises, then the exception is raised to- writeand the bus is closed.
- val callback_arity : ('callback, _) t -> 'callback Callback_arity.t
- val num_subscribers : (_, _) t -> Core_kernel__.Import.int
- val is_closed : (_, _) t -> Core_kernel__.Import.bool
- val close : 'callback Read_write.t -> Core_kernel__.Import.unit
- closedisallows future- writes -- once- close tis called, all further calls to- write twill raise.- closeis idempotent. If- closeis called from within a callback, the current message will still be sent to all subscribed callbacks that have not yet seen it before the close takes effect.
- val write : ('a -> Core_kernel__.Import.unit) Read_write.t -> 'a -> Core_kernel__.Import.unit
- val write2 : ('a -> 'b -> Core_kernel__.Import.unit) Read_write.t -> 'a -> 'b -> Core_kernel__.Import.unit
- val write3 : ('a -> 'b -> 'c -> Core_kernel__.Import.unit) Read_write.t -> 'a -> 'b -> 'c -> Core_kernel__.Import.unit
- val write4 : ('a -> 'b -> 'c -> 'd -> Core_kernel__.Import.unit) Read_write.t -> 'a -> 'b -> 'c -> 'd -> Core_kernel__.Import.unit
module Subscriber : sig ... end- val subscribe_exn : ?extract_exn:Core_kernel__.Import.bool -> ?on_callback_raise:(Error.t -> Core_kernel__.Import.unit) -> ?on_close:(Core_kernel__.Import.unit -> Core_kernel__.Import.unit) -> 'callback Read_only.t -> Source_code_position.t -> f:'callback -> 'callback Subscriber.t
- subscribe_exn t [%here] ~fadds the callback- fto the set of- t's subscribers, and returns a- Subscriber.tthat can later be used to- unsubscribe.- [%here]is stored in the- Subscriber.t, and contained in- %sexp_of: Subscriber.t, which can help with debugging. If- subscribe_exn tis called by a callback in- t, i.e., during- write t, the subscription takes effect for the next- write, but does not affect the current- write.- subscribe_exntakes time proportional to the number of callbacks.- If - on_callback_raiseis supplied, then it will be called by- writewhenever- fraises; only if that subsequently raises will- t's- on_callback_raisebe called. If- on_callback_raiseis not supplied, then- t's- on_callback_raisewill be called whenever- fraises.- If - on_callback_raiseis supplied and- extract_exnis set to true, then the error passed to the- on_callback_raisemethod will contain only the exception raised by- fwithout any additional information about the bus subscription or backtrace.- on_closeis called if you are still subscribed when- Bus.closeis called.
- val iter_exn : 'callback Read_only.t -> Source_code_position.t -> f:'callback -> Core_kernel__.Import.unit
- iter_exn t [%here] ~fis- ignore (subscribe_exn t [%here] ~callback:f). This captures the common usage in which one never wants to unsubscribe from a bus.
module Fold_arity : sig ... end- val fold_exn : 'callback Read_only.t -> Source_code_position.t -> ('callback, 'f, 's) Fold_arity.t -> init:'s -> f:'f -> Core_kernel__.Import.unit
- fold_exn t [%here] arity ~init ~ffolds over the bus events, threading a state value to every call. It is otherwise similar to- iter_exn.
- val unsubscribe : 'callback Read_only.t -> 'callback Subscriber.t -> Core_kernel__.Import.unit
- unsubscribe t subscriberremoves the callback corresponding to- subscriberfrom- t.- unsubscribenever raises and is idempotent. As with- subscribe_exn,- unsubscribe tduring- write ttakes effect after the current- writefinishes. Also like- subscribe_exn,- unsubscribetakes time proportional to the number of callbacks.