Up

Module Bus

A Bus is a publisher/subscriber system within the memory space of the program. A bus has a mutable set of subscribers, which can be modified using subscribe_exn and unsubscribe.

create returns a Bus.Read_write.t, which you can use to write value to the bus. write calls the callbacks of all current subscribers before returning.

In a ('callback, 'phantom) Bus.t, 'phantom is a read-write phantom type that controls whether one can read values from or write values to the bus. The phantom type states the capabilities one could ever have access to, not the capabilities that are immediately available. In particular, if one wants to subscribe to a Bus.Read_write.t, one must call read_only on it in order to get a Bus.Read_only.t that can be passed to subscribe_exn. This is deliberate, and is meant to avoid unintentional reads from code that should only be writing.

Signature

module Callback_arity : sig .. end
Callback_arity states the type of callbacks stored in a bus.
type ('callback, 'phantom) t
val sexp_of_t : ('callback -> Sexplib.Sexp.t) -> ('phantom -> Sexplib.Sexp.t) -> ('callback, 'phantom) t -> Sexplib.Sexp.t
type ('callback, 'phantom) bus = ('callback, 'phantom) t
module Read_write : sig .. end
module Read_only : sig .. end
val read_only : ('callback, _) t -> 'callback Read_only.t
val create : ?name:Info.t -> Source_code_position.t -> 'callback Callback_arity.t -> allow_subscription_after_first_write:bool -> on_callback_raise:(Error.t -> unit) -> 'callback Read_write.t

In create [%here] ArityN ~allow_subscription_after_first_write ~on_callback_raise, [%here] is stored in the resulting bus, and contained in %sexp_of: t, which can help with debugging. If allow_subscription_after_first_write is false, then subscribe_exn will raise if it is called after write has been called the first time. If a callback raises, on_callback_raise is called with an error containing the exception. If on_callback_raise raises, then the exception is raised to write and the bus is closed.

val callback_arity : ('callback, _) t -> 'callback Callback_arity.t
val num_subscribers : (_, _) t -> int
val is_closed : (_, _) t -> bool
val close : 'callback Read_write.t -> unit

close disallows future writes -- once close t is called, all further calls to write t will raise. close is idempotent. If close is called from within a callback, the current message will still be sent to all subscribed callbacks that have not yet seen it before the close takes effect.

val write : 'callback Read_write.t -> 'callback

write calls all callbacks currently subscribed to the bus, with no guarantee on the order in which they will be called. write is fast and non-allocating, though the callbacks themselves may allocate.

Calling write t from within a callback on t or if is_closed t will raise.

module Subscriber : sig .. end
val subscribe_exn : ?on_callback_raise:(Error.t -> unit) -> 'callback Read_only.t -> Source_code_position.t -> f:'callback -> 'callback Subscriber.t

subscribe_exn t [%here] ~f adds the callback f to the set of t's subscribers, and returns a Subscriber.t that can later be used to unsubscribe. [%here] is stored in the Subscriber.t, and contained in %sexp_of: Subscriber.t, which can help with debugging. If subscribe_exn t is called by a callback in t, i.e. during write t, the subscription takes effect for the next write, but does not affect the current write. subscribe_exn takes time proportional to the number of callbacks.

If on_callback_raise is supplied, then it will be called by write whenever f raises; only if that subsequently raises will t's on_callback_raise be called. If on_callback_raise is not supplied, then t's on_callback_raise will be called whenever f raises.

val iter_exn : 'callback Read_only.t -> Source_code_position.t -> f:'callback -> unit

iter_exn t [%here] ~f is ignore (subscribe_exn t [%here] ~callback:f). This captures the common usage in which one never wants to unsubscribe from a bus.

module Fold_arity : sig .. end
val fold_exn : 'callback Read_only.t -> Source_code_position.t -> ('callback, 'f, 's) Fold_arity.t -> init:'s -> f:'f -> unit

fold_exn t [%here] arity ~init ~f folds over the bus events, threading a state value to every call. It is otherwise similar to iter_exn.

val unsubscribe : 'callback Read_only.t -> 'callback Subscriber.t -> unit

unsubscribe t subscriber removes the callback corresponding to subscriber from t. unsubscribe never raises and is idempotent. As with subscribe_exn, unsubscribe t during write t takes effect after the current write finishes. Also like subscribe_exn, unsubscribe takes time proportional to the number of callbacks.