module Bus:sig
..end
create
creates a new, unstarted, bus.
can_subscribe_after_start
determines whether subscribe_exn
succeeds after the bus
is started.
type 'a
t
include Invariant.S1
val create : can_subscribe_after_start:bool -> 'a t
create
creates a new, unstarted, bus.
can_subscribe_after_start
determines whether subscribe_exn
succeeds after the bus
is started.
val start : 'a t -> unit
start t
starts the bus; it starts delivery of queued messages to readers. start
will not run any subscribers; it just creates the async job that will start running
them.val flushed : 'a t -> unit Import.Deferred.t
flushed t
returns a deferred that becomes determined when all subscribers have
processed all values previously writen to t
.val write : 'a t -> 'a -> unit
write t a
enqueues a
on the bus, but does not call any subscriber functions
immediately. Multiple write
calls in the same async cycle are efficient (i.e. they
don't create a deferred per item).module Subscriber:sig
..end
subscribe_exn t ~f
causes f
to be applied to all values subsequently written to
t
(or if t
is unstarted, to prior values as well).
val subscribe_exn : 'a t -> f:('a -> unit) -> 'a Subscriber.t
val unsubscribe : 'a t -> 'a Subscriber.t -> unit
val reader_exn : 'a t -> 'a Import.Pipe.Reader.t
reader_exn t
returns a pipe that contains all elements subsequently written to t
(and including prior values, if t
is unstarted). The difference with
subscribe_exn
is that the consumer may be an arbitrary amount behind other
subscribers/consumers. Pushback on the pipe is not honored. Closing the reader is
equivalent to calling unsubscribe
.val sexp_of_t : ('a -> Sexplib.Sexp.t) -> 'a t -> Sexplib.Sexp.t
create
creates a new, unstarted, bus.
can_subscribe_after_start
determines whether subscribe_exn
succeeds after the bus
is started.
start t
starts the bus; it starts delivery of queued messages to readers. start
will not run any subscribers; it just creates the async job that will start running
them.
flushed t
returns a deferred that becomes determined when all subscribers have
processed all values previously writen to t
.
write t a
enqueues a
on the bus, but does not call any subscriber functions
immediately. Multiple write
calls in the same async cycle are efficient (i.e. they
don't create a deferred per item).
subscribe_exn t ~f
causes f
to be applied to all values subsequently written to
t
(or if t
is unstarted, to prior values as well). subscribe_exn
raises if it
is called on a started bus with not can_subscribe_after_start
. The function f
is
allowed to call other Bus
functions on t
, e.g. write
, subscribe_exn
, or
unsubscribe
.
If f
raises, the corresponding subscriber will be automatically unsubscribed, and
the exception will be sent to the monitor in effect when subscribe_exn
was called.
Once unsubscribe t subscriber
is called, f
will never be called again.
reader_exn t
returns a pipe that contains all elements subsequently written to t
(and including prior values, if t
is unstarted). The difference with
subscribe_exn
is that the consumer may be an arbitrary amount behind other
subscribers/consumers. Pushback on the pipe is not honored. Closing the reader is
equivalent to calling unsubscribe
.