An immutable sequence of values, with a possibly incomplete tail that may be extended asynchronously.
For most applications one should use Pipe instead of Stream. One justifiable usage
of Stream
rather than Pipe
is in single-writer, multi-consumer (multicast)
scenarios where pushback is not required.
The basic primitive operation for getting the next element out of stream is
Stream.next
, which (asynchronously) returns the element and the rest of the
stream.
module Deferred = Async_kernel__.Deferred1
sexp_of_t t f
returns a sexp of all of the elements currently available in the
stream. It is just for display purposes. There is no t_of_sexp
.
include sig ... end
val sexp_of_t : ('a ‑> Base.Sexp.t) ‑> 'a t ‑> Base.Sexp.t
val create : ('a Async_kernel.Tail.t ‑> unit) ‑> 'a t
create f
returns a stream t
and calls f tail
, where the elements of the stream
are determined as the tail is extended, and the end of the stream is reached when the
tail is closed.
next t
returns a deferred that will become determined when the next part of the
stream is determined. This is Cons (v, t')
, where v is the next element of the
stream and t' is the rest of the stream, or with Nil at the end of the stream.
val next : 'a t ‑> 'a next Deferred.t
val first_exn : 'a t ‑> 'a Deferred.t
first_exn t
returns a deferred that becomes determined with the first element of
t
.
Streams can be converted to and from lists. Although, conversion to a list returns a deferred, because the stream is determined asynchronously.
val to_list : 'a t ‑> 'a list Deferred.t
to_list t
returns a deferred that will become determined with the list
of elements in t, if the end of t is reached.
val of_fun : (unit ‑> 'a Deferred.t) ‑> 'a t
of_fun f
returns a stream whose elements are determined by calling f
forever.
val copy_to_tail : 'a t ‑> 'a Async_kernel.Tail.t ‑> unit Deferred.t
copy_to_tail t tail
reads elements from t
and puts them in tail
, until
the end of t
is reached.
Sequence operations ---------------------------------------------------------------------- There are the usual sequence operations:
append, fold, iter, map, filter_map, take
There are also deferred variants:
iter', map', filter_map'
These take anonymous functions that return deferreds generalizing the usual sequence operation and allowing the client to control the rate at which the sequence is processed.
append t1 t2
returns a stream with all the values of t1, in order, and if t1 ends,
these values are followed by all the values of t2.
concat t
takes a stream of streams and produces a stream that is the concatenation
of each stream in order (you see all of stream 1, then all of stream 2... etc.)
filter_deprecated s ~f
returns a stream with one element, v, for each v in s such
with f v = true.
Using filter_deprecated
can easily lead to space leaks. It is better to use
Async.Pipe
than Async.Stream
.
filter_map_deprecated s ~f
returns a stream with one element, v', for each v in s
such with f v = Some v'.
Using filter_map_deprecated
can easily lead to space leaks. It is better to use
Async.Pipe
than Async.Stream
.
val fold' : 'a t ‑> init:'b ‑> f:('b ‑> 'a ‑> 'b Deferred.t) ‑> 'b Deferred.t
fold' t ~init ~f
is like list fold, walking over the elements of the stream in
order, as they become available. fold'
returns a deferred that will yield the final
value of the accumulator, if the end of the stream is reached.
val fold : 'a t ‑> init:'b ‑> f:('b ‑> 'a ‑> 'b) ‑> 'b Deferred.t
fold t ~init ~f
is a variant of fold'
in which f
does not return a deferred.
val iter' : 'a t ‑> f:('a ‑> unit Deferred.t) ‑> unit Deferred.t
iter' t ~f
applies f
to each element of the stream in turn, as they become
available. It continues onto the next element only after the deferred returned by f
becomes determined.
val closed : _ t ‑> unit Deferred.t
closed t
returns a deferred that becomes determined when the end of t
is
reached.
val iter : 'a t ‑> f:('a ‑> unit) ‑> unit
iter t ~f
= don't_wait_for (iter' t ~f:(fun a -> f a; return ()))
val take_until : 'a t ‑> unit Deferred.t ‑> 'a t
take_until t d
returns a stream t'
that has the same elements as t
up until d
becomes determined.
val iter_durably' : 'a t ‑> f:('a ‑> unit Deferred.t) ‑> unit Deferred.t
iter_durably' t ~f
is like iter' t ~f
, except if f
raises an exception it
continues with the next element of the stream *and* reraises the exception (to the
monitor in scope when iter_durably was called).
iter_durably t ~f
is like iter t ~f
, except if f
raises an exception it
continues with the next element of the stream *and* reraises the exception (to the
monitor in scope when iter_durably was called).
iter_durably_report_end t ~f
is equivalent to iter_durably' t ~f:(fun x -> return
(f x))
but it is more efficient
val iter_durably : 'a t ‑> f:('a ‑> unit) ‑> unit
val iter_durably_report_end : 'a t ‑> f:('a ‑> unit) ‑> unit Deferred.t
val length : 'a t ‑> int Deferred.t
length s
returns a deferred that is determined when the end of s is reached, taking
the value of the number of elements in s
val map' : 'a t ‑> f:('a ‑> 'b Deferred.t) ‑> 'b t
map' t f
creates a new stream that with one element, (f v), for each element v of
t.
map t ~f
creates a new stream that with one element, (f v), for each element v of t.
map t f
= map' t ~f:(fun a -> return (f a))
.
first_n t n
returns a stream with the first n elements of t, if t has n or more
elements, or it returns t.
Stream generation ----------------------------------------------------------------------
val unfold : 'b ‑> f:('b ‑> ('a * 'b) option Deferred.t) ‑> 'a t
unfold b f
returns a stream a1; a2; ...; an
whose elements are
determined by the equations:
b0 = b Some (a1, b1) = f b0 Some (a2, b2) = f b1 ... None = f bn
Miscellaneous operations ----------------------------------------------------------------------
val split : ?stop:unit Deferred.t ‑> ?f:('a ‑> [ `Continue | `Found of 'b ]) ‑> 'a t ‑> 'a t * [ `End_of_stream | `Stopped of 'a t | `Found of 'b * 'a t ] Deferred.t
split ~stop ~f t
returns a pair (p, d)
, where p
is a prefix of t
that ends
for one of three reasons:
1. [t] ends 2. stop becomes determined 3. f returns `Found
The deferred d
describes why the prefix ended, and returns the suffix of the
stream in case (2) or (3).
val find : 'a t ‑> f:('a ‑> bool) ‑> [ `End_of_stream | `Found of 'a * 'a t ] Deferred.t
find ~f t
returns a deferred that becomes determined when f x
is true for some
element of t
, or if the end of the stream is reached