Up

Module Bus = Core_kernel.Bus

Signature

module Callback_arity : sig .. end
Callback_arity states the type of callbacks stored in a bus.
type ('callback, 'phantom) t
val sexp_of_t : ('callback -> Sexplib.Sexp.t) -> ('phantom -> Sexplib.Sexp.t) -> ('callback, 'phantom) t -> Sexplib.Sexp.t
type ('callback, 'phantom) bus = ('callback, 'phantom) t
module Read_write : sig .. end
module Read_only : sig .. end
val read_only : ('callback, _) t -> 'callback Read_only.t
val create : ?name:Core_kernel.Info.t -> Core_kernel.Source_code_position.t -> 'callback Callback_arity.t -> allow_subscription_after_first_write:bool -> on_callback_raise:(Core_kernel.Error.t -> unit) -> 'callback Read_write.t

In create [%here] ArityN ~allow_subscription_after_first_write ~on_callback_raise, [%here] is stored in the resulting bus, and contained in %sexp_of: t, which can help with debugging. If allow_subscription_after_first_write is false, then subscribe_exn will raise if it is called after write has been called the first time. If a callback raises, on_callback_raise is called with an error containing the exception. If on_callback_raise raises, then the exception is raised to write and the bus is closed.

val callback_arity : ('callback, _) t -> 'callback Callback_arity.t
val num_subscribers : (_, _) t -> int
val is_closed : (_, _) t -> bool
val close : 'callback Read_write.t -> unit

close disallows future writes -- once close t is called, all further calls to write t will raise. close is idempotent. If close is called from within a callback, the current message will still be sent to all subscribed callbacks that have not yet seen it before the close takes effect.

val write : 'callback Read_write.t -> 'callback

write calls all callbacks currently subscribed to the bus, with no guarantee on the order in which they will be called. write is fast and non-allocating, though the callbacks themselves may allocate.

Calling write t from within a callback on t or if is_closed t will raise.

module Subscriber : sig .. end
val subscribe_exn : ?on_callback_raise:(Core_kernel.Error.t -> unit) -> 'callback Read_only.t -> Core_kernel.Source_code_position.t -> f:'callback -> 'callback Subscriber.t

subscribe_exn t [%here] ~f adds the callback f to the set of t's subscribers, and returns a Subscriber.t that can later be used to unsubscribe. [%here] is stored in the Subscriber.t, and contained in %sexp_of: Subscriber.t, which can help with debugging. If subscribe_exn t is called by a callback in t, i.e. during write t, the subscription takes effect for the next write, but does not affect the current write. subscribe_exn takes time proportional to the number of callbacks.

If on_callback_raise is supplied, then it will be called by write whenever f raises; only if that subsequently raises will t's on_callback_raise be called. If on_callback_raise is not supplied, then t's on_callback_raise will be called whenever f raises.

val iter_exn : 'callback Read_only.t -> Core_kernel.Source_code_position.t -> f:'callback -> unit

iter_exn t [%here] ~f is ignore (subscribe_exn t [%here] ~callback:f). This captures the common usage in which one never wants to unsubscribe from a bus.

module Fold_arity : sig .. end
val fold_exn : 'callback Read_only.t -> Core_kernel.Source_code_position.t -> ('callback, 'f, 's) Fold_arity.t -> init:'s -> f:'f -> unit

fold_exn t [%here] arity ~init ~f folds over the bus events, threading a state value to every call. It is otherwise similar to iter_exn.

val unsubscribe : 'callback Read_only.t -> 'callback Subscriber.t -> unit

unsubscribe t subscriber removes the callback corresponding to subscriber from t. unsubscribe never raises and is idempotent. As with subscribe_exn, unsubscribe t during write t takes effect after the current write finishes. Also like subscribe_exn, unsubscribe takes time proportional to the number of callbacks.