Up

module Bus

: sig
#
type 'a t
include Core.Std.Invariant.S1 with type 'a t := 'a t
#
val create : can_subscribe_after_start:bool -> 'a t

create creates a new, unstarted, bus.

can_subscribe_after_start determines whether subscribe_exn succeeds after the bus is started.

#
val start : _ t -> unit

start t starts the bus; it starts delivery of queued messages to readers. start will not run any subscribers; it just creates the Async job that will start running them.

#
val flushed : _ t -> unit Import.Deferred.t

flushed t returns a deferred that becomes determined when all subscribers have processed all values previously writen to t.

#
val write : 'a t -> 'a -> unit

write t a enqueues a on the bus, but does not call any subscriber functions immediately. Multiple write calls in the same Async cycle are efficient (i.e. they don't create a deferred per item).

#
module Subscriber : sig

subscribe_exn t ~f causes f to be applied to all values subsequently written to t (or if t is unstarted, to prior values as well). subscribe_exn raises if it is called on a started bus with not can_subscribe_after_start. The function f is allowed to call other Bus functions on t, e.g. write, subscribe_exn, or unsubscribe.

If f raises, the corresponding subscriber will be automatically unsubscribed, and the exception will be sent to the monitor in effect when subscribe_exn was called.

Once unsubscribe t subscriber is called, f will never be called again.

#
type 'a t
#
val sexp_of_t : ('a -> Sexplib.Sexp.t) -> 'a t -> Sexplib.Sexp.t
end
#
val subscribe_exn : 'a t -> f:('a -> unit) -> 'a Subscriber.t
#
val unsubscribe : 'a t -> 'a Subscriber.t -> unit
#
val reader_exn : 'a t -> 'a Import.Pipe.Reader.t

reader_exn t returns a pipe that contains all elements subsequently written to t (and including prior values, if t is unstarted). The difference with subscribe_exn is that the consumer may be an arbitrary amount behind other subscribers/consumers. Pushback on the pipe is not honored. Closing the reader is equivalent to calling unsubscribe.

#
val sexp_of_t : ('a -> Sexplib.Sexp.t) -> 'a t -> Sexplib.Sexp.t
end