A Bus
is a publisher/subscriber system within the memory space of the program. A
bus has a mutable set of subscribers, which can be modified using subscribe_exn
and
unsubscribe
.
create
returns a Bus.Read_write.t
, which you can use to write
values to the bus.
write
calls the callbacks of all current subscribers before returning.
In a ('callback, 'phantom) Bus.t
, 'phantom
is a read-write phantom type that
controls whether one can read values from or write values to the bus. The phantom
type states the capabilities one could ever have access to, not the capabilities that
are immediately available. In particular, if one wants to subscribe to a
Bus.Read_write.t
, one must call read_only
on it in order to get a
Bus.Read_only.t
that can be passed to subscribe_exn
. This is deliberate, and is
meant to avoid unintentional reads from code that should only be writing.
module Callback_arity : sig ... end
Callback_arity
states the type of callbacks stored in a bus. Using Callback_arity
is an implementation technique that allows callbacks to be defined as ordinary n-ary
curried functions (e.g., a1 -> a2 -> a3 -> r
), instead of forcing n-ary-variadic
callbacks to use tuples (e.g., a1 * a2 * a3 -> r
). This also avoids extra
allocation.
include sig ... end
val sexp_of_t : ('callback ‑> Sexplib.Sexp.t) ‑> ('phantom ‑> Sexplib.Sexp.t) ‑> ('callback, 'phantom) t ‑> Sexplib.Sexp.t
module Read_write : sig ... end
module Read_only : sig ... end
module On_subscription_after_first_write : sig ... end
val read_only : ('callback, _) t ‑> 'callback Read_only.t
val create : ?name:Core_kernel.Info.t ‑> Core_kernel.Source_code_position.t ‑> 'callback Callback_arity.t ‑> on_subscription_after_first_write:On_subscription_after_first_write.t ‑> on_callback_raise:(Core_kernel.Error.t ‑> Core_kernel__.Import.unit) ‑> 'callback Read_write.t
In create [%here] ArityN ~on_subscription_after_first_write ~on_callback_raise
,
[%here]
is stored in the resulting bus, and contained in %sexp_of: t
, which can
help with debugging.
If on_subscription_after_first_write
is Raise
, then subscribe_exn
will raise if
it is called after write
has been called the first time. If
on_subscription_after_first_write
is Allow_and_send_last_value
, then the bus will
remember the last value written and will send it to new subscribers.
If a callback raises, on_callback_raise
is called with an error containing the
exception.
If on_callback_raise
raises, then the exception is raised to write
and the bus is
closed.
val callback_arity : ('callback, _) t ‑> 'callback Callback_arity.t
val num_subscribers : (_, _) t ‑> Core_kernel__.Import.int
val is_closed : (_, _) t ‑> Core_kernel__.Import.bool
val close : 'callback Read_write.t ‑> Core_kernel__.Import.unit
close
disallows future write
s -- once close t
is called, all further calls to
write t
will raise. close
is idempotent. If close
is called from within a
callback, the current message will still be sent to all subscribed callbacks that
have not yet seen it before the close takes effect.
write
... write4
call all callbacks currently subscribed to the bus, with no
guarantee on the order in which they will be called. write
is non-allocating,
though the callbacks themselves may allocate. Calling writeN t
from within a
callback on t
if is_closed t
will raise.
val write : ('a ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> Core_kernel__.Import.unit
val write2 : ('a ‑> 'b ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> Core_kernel__.Import.unit
val write3 : ('a ‑> 'b ‑> 'c ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> 'c ‑> Core_kernel__.Import.unit
val write4 : ('a ‑> 'b ‑> 'c ‑> 'd ‑> Core_kernel__.Import.unit) Read_write.t ‑> 'a ‑> 'b ‑> 'c ‑> 'd ‑> Core_kernel__.Import.unit
module Subscriber : sig ... end
val subscribe_exn : ?on_callback_raise:(Core_kernel.Error.t ‑> Core_kernel__.Import.unit) ‑> 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> f:'callback ‑> 'callback Subscriber.t
subscribe_exn t [%here] ~f
adds the callback f
to the set of t
's subscribers,
and returns a Subscriber.t
that can later be used to unsubscribe
. [%here]
is
stored in the Subscriber.t
, and contained in %sexp_of: Subscriber.t
, which can
help with debugging. If subscribe_exn t
is called by a callback in t
, i.e.,
during write t
, the subscription takes effect for the next write
, but does not
affect the current write
. subscribe_exn
takes time proportional to the number of
callbacks.
If on_callback_raise
is supplied, then it will be called by write
whenever f
raises; only if that subsequently raises will t
's on_callback_raise
be called. If
on_callback_raise
is not supplied, then t
's on_callback_raise
will be called
whenever f
raises.
val iter_exn : 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> f:'callback ‑> Core_kernel__.Import.unit
iter_exn t [%here] ~f
is ignore (subscribe_exn t [%here] ~callback:f)
. This
captures the common usage in which one never wants to unsubscribe from a bus.
module Fold_arity : sig ... end
val fold_exn : 'callback Read_only.t ‑> Core_kernel.Source_code_position.t ‑> ('callback, 'f, 's) Fold_arity.t ‑> init:'s ‑> f:'f ‑> Core_kernel__.Import.unit
fold_exn t [%here] arity ~init ~f
folds over the bus events, threading a state value
to every call. It is otherwise similar to iter_exn
.
val unsubscribe : 'callback Read_only.t ‑> 'callback Subscriber.t ‑> Core_kernel__.Import.unit
unsubscribe t subscriber
removes the callback corresponding to subscriber
from
t
. unsubscribe
never raises and is idempotent. As with subscribe_exn
,
unsubscribe t
during write t
takes effect after the current write
finishes.
Also like subscribe_exn
, unsubscribe
takes time proportional to the number of
callbacks.