An immutable sequence of values, with a possibly incomplete tail that may be extended asynchronously.
For most applications one should use Pipe instead of Stream. One justifiable usage
of Stream rather than Pipe is in single-writer, multi-consumer (multicast)
scenarios where pushback is not required.
The basic primitive operation for getting the next element out of stream is
Stream.next, which (asynchronously) returns the element and the rest of the
stream.
module Deferred = Async_kernel__.Deferred1sexp_of_t t f returns a sexp of all of the elements currently available in the
stream. It is just for display purposes. There is no t_of_sexp.
include sig ... endval sexp_of_t : ('a ‑> Base.Sexp.t) ‑> 'a t ‑> Base.Sexp.tval create : ('a Async_kernel.Tail.t ‑> unit) ‑> 'a tcreate f returns a stream t and calls f tail, where the elements of the stream
are determined as the tail is extended, and the end of the stream is reached when the
tail is closed.
next t returns a deferred that will become determined when the next part of the
stream is determined. This is Cons (v, t'), where v is the next element of the
stream and t' is the rest of the stream, or with Nil at the end of the stream.
val next : 'a t ‑> 'a next Deferred.tval first_exn : 'a t ‑> 'a Deferred.tfirst_exn t returns a deferred that becomes determined with the first element of
t.
Streams can be converted to and from lists. Although, conversion to a list returns a deferred, because the stream is determined asynchronously.
val to_list : 'a t ‑> 'a list Deferred.tto_list t returns a deferred that will become determined with the list
of elements in t, if the end of t is reached.
val of_fun : (unit ‑> 'a Deferred.t) ‑> 'a tof_fun f returns a stream whose elements are determined by calling f forever.
val copy_to_tail : 'a t ‑> 'a Async_kernel.Tail.t ‑> unit Deferred.tcopy_to_tail t tail reads elements from t and puts them in tail, until
the end of t is reached.
Sequence operations ---------------------------------------------------------------------- There are the usual sequence operations:
append, fold, iter, map, filter_map, take
There are also deferred variants:
iter', map', filter_map'
These take anonymous functions that return deferreds generalizing the usual sequence operation and allowing the client to control the rate at which the sequence is processed.
append t1 t2 returns a stream with all the values of t1, in order, and if t1 ends,
these values are followed by all the values of t2.
concat t takes a stream of streams and produces a stream that is the concatenation
of each stream in order (you see all of stream 1, then all of stream 2... etc.)
filter_deprecated s ~f returns a stream with one element, v, for each v in s such
with f v = true.
Using filter_deprecated can easily lead to space leaks. It is better to use
Async.Pipe than Async.Stream.
filter_map_deprecated s ~f returns a stream with one element, v', for each v in s
such with f v = Some v'.
Using filter_map_deprecated can easily lead to space leaks. It is better to use
Async.Pipe than Async.Stream.
val fold' : 'a t ‑> init:'b ‑> f:('b ‑> 'a ‑> 'b Deferred.t) ‑> 'b Deferred.tfold' t ~init ~f is like list fold, walking over the elements of the stream in
order, as they become available. fold' returns a deferred that will yield the final
value of the accumulator, if the end of the stream is reached.
val fold : 'a t ‑> init:'b ‑> f:('b ‑> 'a ‑> 'b) ‑> 'b Deferred.tfold t ~init ~f is a variant of fold' in which f does not return a deferred.
val iter' : 'a t ‑> f:('a ‑> unit Deferred.t) ‑> unit Deferred.titer' t ~f applies f to each element of the stream in turn, as they become
available. It continues onto the next element only after the deferred returned by f
becomes determined.
val closed : _ t ‑> unit Deferred.tclosed t returns a deferred that becomes determined when the end of t is
reached.
val iter : 'a t ‑> f:('a ‑> unit) ‑> unititer t ~f = don't_wait_for (iter' t ~f:(fun a -> f a; return ()))
val take_until : 'a t ‑> unit Deferred.t ‑> 'a ttake_until t d returns a stream t' that has the same elements as t up until d
becomes determined.
val iter_durably' : 'a t ‑> f:('a ‑> unit Deferred.t) ‑> unit Deferred.titer_durably' t ~f is like iter' t ~f, except if f raises an exception it
continues with the next element of the stream *and* reraises the exception (to the
monitor in scope when iter_durably was called).
iter_durably t ~f is like iter t ~f, except if f raises an exception it
continues with the next element of the stream *and* reraises the exception (to the
monitor in scope when iter_durably was called).
iter_durably_report_end t ~f is equivalent to iter_durably' t ~f:(fun x -> return
(f x)) but it is more efficient
val iter_durably : 'a t ‑> f:('a ‑> unit) ‑> unitval iter_durably_report_end : 'a t ‑> f:('a ‑> unit) ‑> unit Deferred.tval length : 'a t ‑> int Deferred.tlength s returns a deferred that is determined when the end of s is reached, taking
the value of the number of elements in s
val map' : 'a t ‑> f:('a ‑> 'b Deferred.t) ‑> 'b tmap' t f creates a new stream that with one element, (f v), for each element v of
t.
map t ~f creates a new stream that with one element, (f v), for each element v of t.
map t f = map' t ~f:(fun a -> return (f a)).
first_n t n returns a stream with the first n elements of t, if t has n or more
elements, or it returns t.
Stream generation ----------------------------------------------------------------------
val unfold : 'b ‑> f:('b ‑> ('a * 'b) option Deferred.t) ‑> 'a tunfold b f returns a stream a1; a2; ...; an whose elements are
determined by the equations:
b0 = b
Some (a1, b1) = f b0
Some (a2, b2) = f b1
...
None = f bnMiscellaneous operations ----------------------------------------------------------------------
val split : ?stop:unit Deferred.t ‑> ?f:('a ‑> [ `Continue | `Found of 'b ]) ‑> 'a t ‑> 'a t * [ `End_of_stream | `Stopped of 'a t | `Found of 'b * 'a t ] Deferred.tsplit ~stop ~f t returns a pair (p, d), where p is a prefix of t that ends
for one of three reasons:
1. [t] ends
2. stop becomes determined
3. f returns `Found
The deferred d describes why the prefix ended, and returns the suffix of the
stream in case (2) or (3).
val find : 'a t ‑> f:('a ‑> bool) ‑> [ `End_of_stream | `Found of 'a * 'a t ] Deferred.tfind ~f t returns a deferred that becomes determined when f x is true for some
element of t, or if the end of the stream is reached