Module Bus
A Bus
is a publisher/subscriber system within the memory space of the program. A bus has a mutable set of subscribers, which can be modified using subscribe_exn
and unsubscribe
.
create
returns a Bus.Read_write.t
, which you can use to write
values to the bus. write
calls the callbacks of all current subscribers before returning.
In a ('callback, 'phantom) Bus.t
, 'phantom
is a read-write phantom type that controls whether one can read values from or write values to the bus. The phantom type states the capabilities one could ever have access to, not the capabilities that are immediately available. In particular, if one wants to subscribe to a Bus.Read_write.t
, one must call read_only
on it in order to get a Bus.Read_only.t
that can be passed to subscribe_exn
. This is deliberate, and is meant to avoid unintentional reads from code that should only be writing.
module Callback_arity : sig ... end
Callback_arity
states the type of callbacks stored in a bus. UsingCallback_arity
is an implementation technique that allows callbacks to be defined as ordinary n-ary curried functions (e.g.,a1 -> a2 -> a3 -> r
), instead of forcing n-ary-variadic callbacks to use tuples (e.g.,a1 * a2 * a3 -> r
). This also avoids extra allocation.
val sexp_of_t : ('callback -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('callback, 'phantom) t -> Ppx_sexp_conv_lib.Sexp.t
type ('callback, 'phantom) bus
= ('callback, 'phantom) t
module Read_write : sig ... end
module Read_only : sig ... end
module On_subscription_after_first_write : sig ... end
val read_only : ('callback, _) t -> 'callback Read_only.t
val create : ?name:Core_kernel.Info.t -> Core_kernel.Source_code_position.t -> 'callback Callback_arity.t -> on_subscription_after_first_write:On_subscription_after_first_write.t -> on_callback_raise:(Core_kernel.Error.t -> unit) -> 'callback Read_write.t
In
create [%here] ArityN ~on_subscription_after_first_write ~on_callback_raise
,[%here]
is stored in the resulting bus, and contained in%sexp_of: t
, which can help with debugging.If
on_subscription_after_first_write
isRaise
, thensubscribe_exn
will raise if it is called afterwrite
has been called the first time. Ifon_subscription_after_first_write
isAllow_and_send_last_value
, then the bus will remember the last value written and will send it to new subscribers.If a callback raises,
on_callback_raise
is called with an error containing the exception.If
on_callback_raise
raises, then the exception is raised towrite
and the bus is closed.
val callback_arity : ('callback, _) t -> 'callback Callback_arity.t
val num_subscribers : (_, _) t -> int
val is_closed : (_, _) t -> bool
val close : 'callback Read_write.t -> unit
close
disallows futurewrite
s -- onceclose t
is called, all further calls towrite t
will raise.close
is idempotent. Ifclose
is called from within a callback, the current message will still be sent to all subscribed callbacks that have not yet seen it before the close takes effect.
val write : ('a -> unit) Read_write.t -> 'a -> unit
val write2 : ('a -> 'b -> unit) Read_write.t -> 'a -> 'b -> unit
val write3 : ('a -> 'b -> 'c -> unit) Read_write.t -> 'a -> 'b -> 'c -> unit
val write4 : ('a -> 'b -> 'c -> 'd -> unit) Read_write.t -> 'a -> 'b -> 'c -> 'd -> unit
val write5 : ('a -> 'b -> 'c -> 'd -> 'e -> unit) Read_write.t -> 'a -> 'b -> 'c -> 'd -> 'e -> unit
module Subscriber : sig ... end
val subscribe_exn : ?extract_exn:bool -> ?on_callback_raise:(Core_kernel.Error.t -> unit) -> ?on_close:(unit -> unit) -> ('callback, [> Core_kernel.read ]) t -> Core_kernel.Source_code_position.t -> f:'callback -> 'callback Subscriber.t
subscribe_exn t [%here] ~f
adds the callbackf
to the set oft
's subscribers, and returns aSubscriber.t
that can later be used tounsubscribe
.[%here]
is stored in theSubscriber.t
, and contained in%sexp_of: Subscriber.t
, which can help with debugging. Ifsubscribe_exn t
is called by a callback int
, i.e., duringwrite t
, the subscription takes effect for the nextwrite
, but does not affect the currentwrite
.subscribe_exn
takes amortized constant time.If
on_callback_raise
is supplied, then it will be called bywrite
wheneverf
raises; only if that subsequently raises willt
'son_callback_raise
be called. Ifon_callback_raise
is not supplied, thent
'son_callback_raise
will be called wheneverf
raises.If
on_callback_raise
is supplied andextract_exn
is set to true, then the error passed to theon_callback_raise
method will contain only the exception raised byf
without any additional information about the bus subscription or backtrace.on_close
is called if you are still subscribed whenBus.close
is called.
val iter_exn : ('callback, [> Core_kernel.read ]) t -> Core_kernel.Source_code_position.t -> f:'callback -> unit
iter_exn t [%here] ~f
isignore (subscribe_exn t [%here] ~callback:f)
. This captures the common usage in which one never wants to unsubscribe from a bus.
module Fold_arity : sig ... end
val fold_exn : ('callback, [> Core_kernel.read ]) t -> Core_kernel.Source_code_position.t -> ('callback, 'f, 's) Fold_arity.t -> init:'s -> f:'f -> unit
fold_exn t [%here] arity ~init ~f
folds over the bus events, threading a state value to every call. It is otherwise similar toiter_exn
.
val unsubscribe : ('callback, [> Core_kernel.read ]) t -> 'callback Subscriber.t -> unit
unsubscribe t subscriber
removes the callback corresponding tosubscriber
fromt
.unsubscribe
never raises and is idempotent. As withsubscribe_exn
,unsubscribe t
duringwrite t
takes effect after the currentwrite
finishes. Also likesubscribe_exn
,unsubscribe
takes amortized constant time.