module Pipe: Pipetype ('a, 'phantom) t
type('a, 'phantom)pipe =('a, 'phantom) t
module Writer:sig..end
module Reader:sig..end
val create : unit -> 'a Reader.t * 'a Writer.tcreate () creates a new pipe.val init : ('a Writer.t -> unit Deferred.t) -> 'a Reader.tinit f creates a new pipe, applies f to its writer end, and returns its reader
end. init closes the writer end when the result of f becomes determined. If f
raises, the writer end is closed and the exception is raised to the caller of
init.val of_list : 'a list -> 'a Reader.tof_list l returns a closed pipe reader filled with the contents of l.val close : 'a Writer.t -> unitclose t closes the write end of the pipe:
`Eof.`Eof.Close is idempotent.
close_read t closes both the read and write ends of the pipe. It does everything
close does, and in addition:
`Reader_closed.`Eofval close_read : 'a Reader.t -> unitval is_closed : ('a, 'b) t -> boolis_closed t returns true iff close t or close_read t has been called.val closed : ('a, 'b) t -> unit Deferred.tclosed t returns a deferred that becomes determined when close t or close_read t
is called.module Flushed_result:sig..end
val upstream_flushed : ('a, 'b) t -> Flushed_result.t Deferred.tupstream_flushed and downstream_flushed becomes determined
when all values written prior to the call have been consumed, or if the reader end of
the pipe is closed. The difference between "upstream" and "downstream" comes if one
has a chain of pipes that are linked (e.g. by Pipe.map):
P1 --> P2 --> P3
Calling downstream_flushed P2 ensures that everything in P2 has made it out of P3.
Calling upstream_flushed P2 ensures that everything in P1 has made it out of P3.
More generally, downstream_flushed starts at the current pipe and follows the chain
to the final downstream consumer(s). upstream_flushed follows the chain to the
initial upstream pipe(s), and then calls downstream_flushed.
For a pipe in isolation, "consumed" means "read from the pipe". However, for pipes
linked together with transfer or any function built from transfer, "consumed"
means "propagated all the way downstream through the chain and read from the final
pipe in the chain". Furthermore, for a pipe ultimately connected to an
Async.Writer, "consumed" means the OS write() system call has completed on the bytes
read from the final pipe in the chain.
The following Pipe functions automatically link their input and output pipes
together so that flushed on upstream pipes will propagate to downstream pipes:
transfer*, map*, filter_map*, filter, interleave, concat. There is *not*
automatic linking with iter*; however, user code can customize the behavior of
flush functions using Consumer, see below.
val downstream_flushed : ('a, 'b) t -> Flushed_result.t Deferred.tmodule Consumer:sig..end
val add_consumer : 'a Reader.t ->
downstream_flushed:(unit -> Flushed_result.t Deferred.t) ->
Consumer.tadd_consumer reader ~downstream_flushed creates a new consumer of reader, and
causes future calls to flushed_downstream reader to take this consumer into account.
Thereafter, Pipe.flushed_downstream reader will first ensure that values previously
written to reader have been read, then that they have been sent downstream by the
consumer that read them, and finally that they have been flushed downstream.(_, _) t, that is, both readers and
writers.val length : ('a, 'b) t -> intlength t returns the number of elements currently queued in tval is_empty : ('a, 'b) t -> boolis_empty t is true iff there are no values in the pipe.
Producers that write a sequence of values to a pipe should be aware that the consumers
who read from the pipe can close the pipe early -- that is, before the producer has
finished doing all of its writes. If this happens, further writes will raise an
exception. To avoid these errors, all writes must be atomically guarded by
is_closed tests. Thus, a typical writer loop should look like this:
fun countup hi w = (* Send the ints in range [0,hi) to writer W. *)
let rec loop i =
if i < hi and not (is_closed w) then (* Guard write w/closed test. *)
write i w >>> (* Do the write then block until datum *)
fun () -> loop (i+1) (* fits or the pipe is closed. *)
else close w (* No harm done if reader has already closed the pipe.*)
in
loop 0
If the pipe's consumer stops reading early and closes the pipe, countup won't error
out trying to write further values down the pipe: it will immediately wake up and
exit.
val pushback : 'a Writer.t -> unit Deferred.tpushback writer becomes determined when either writer has been closed or
the pipe is empty.val write' : 'a Writer.t -> 'a Core.Std.Queue.t -> unit Deferred.twrite' writer q transfers the elements from q into the pipe, leaving q empty.
write' returns a pushback deferred, as described above. Writing to a closed pipe
raises.
write writer v is equivalent to write' writer (Queue.singleton v).
val write : 'a Writer.t -> 'a -> unit Deferred.tval write_without_pushback' : 'a Writer.t -> 'a Core.Std.Queue.t -> unitwrite_without_pushback' and write_without_pushback are alternatives to write'
and write that can be used when you don't care about the resultant deferred. The
data is added to the pipe and then we return immediately.
write' t values is equivalent to write_without_pushback' t values; pushback t (and
similarly for write).
val write_without_pushback : 'a Writer.t -> 'a -> unitval write_when_ready : 'a Writer.t ->
f:(('a -> unit) -> 'b) -> [ `Closed | `Ok of 'b ] Deferred.twrite_when_ready writer ~f waits until there is space available in the pipe, and
then calls f write, where write can be used by f to write a single value into
the pipe at a time. write_when_ready guarantees that the pipe is open when it calls
f, and hence that the writes will succeed, unless f itself closes the pipe.
The forward-progress semantics means that every call produces some data, so you can process an n-element input with at most n reads; you cannot burn an unbounded number of cycles "spinning" doing an unbounded number of empty-result "polling" calls (which, in a non-preemptive system like Async could lock up the process).
The two exceptions to best-effort/forward-progress semantics are read_now, which
polls for data, thus abandoning the forward-progress guarantee, and read_exactly,
which loops until it has read the entire amount requested (or encountered EOF), thus
abandoning the best-effort guarantee of timeliness.
val read' : ?consumer:Consumer.t ->
'a Reader.t -> [ `Eof | `Ok of 'a Core.Std.Queue.t ] Deferred.tread' pipe reads all of the values available in the pipe, as soon as any value
becomes available. The resulting queue will satisfy Q.length q > 0. The consumer
is used to extend the meaning of values being flushed (see the Consumer module
above).val read : ?consumer:Consumer.t ->
'a Reader.t -> [ `Eof | `Ok of 'a ] Deferred.tread pipe reads a single value from the pipe. The consumer is used to extend the
meaning of values being flushed (see the Consumer module above).val read_at_most : ?consumer:Consumer.t ->
'a Reader.t ->
num_values:int -> [ `Eof | `Ok of 'a Core.Std.Queue.t ] Deferred.tread_at_most r ~num_values reads up to num_values values from the pipe's currently
available data, blocking if the pipe is empty. The resulting queue will satisfy 0 <
Queue.length q <= num_values. read_at_most raises if num_values <= 0. The
consumer is used to extend the meaning of values being flushed (see the Consumer
module above).val read_exactly : ?consumer:Consumer.t ->
'a Reader.t ->
num_values:int ->
[ `Eof | `Exactly of 'a Core.Std.Queue.t | `Fewer of 'a Core.Std.Queue.t ]
Deferred.tread_exactly r ~num_values reads exactly num_values items, unless EOF is
encountered. read_exactly performs a sequence of read_at_most operations, so
there is no guarantee that the queue of values it returns comprise a contiguous
segment of the written stream of values -- other readers might pick off elements
in-between read_exactly's atomic reads. read_exactly raises if num_values <= 0.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).val read_now' : ?consumer:Consumer.t ->
'a Reader.t ->
[ `Eof | `Nothing_available | `Ok of 'a Core.Std.Queue.t ]read_now' reader reads all of the values from reader that are immediately
available. The resulting queue will satisfy Q.length q > 0. If reader is closed,
read_now' returns `Eof. If reader is empty, read_now' returns
`Nothing_available. read_now' has the danger of permitting the computation to
"spin" doing empty reads; it is only useful in exotic circumstances. The consumer
is used to extend the meaning of values being flushed (see the Consumer module
above).
read_now is like read_now', except that it reads a single value rather than
everything that is available.
val read_now : ?consumer:Consumer.t ->
'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a ]val peek : 'a Reader.t -> 'a optionval clear : 'a Reader.t -> unitclear reader consumes all of the values currently in reader, and all blocked
flushes become determined with `Ok.val read_all : 'a Reader.t -> 'a Core.Std.Queue.t Deferred.tread_all reader reads all the values from the pipe until it is closed. An
alternative name might be Reader.to_queue.val values_available : 'a Reader.t -> [ `Eof | `Ok ] Deferred.tvalues_available reader returns a deferred that becomes determined when there are
values in the pipe. If there are multiple readers (a rare situation), there is no
guarantee that some other reader hasn't become active because of ordinary async
scheduling and removed some or all of the values between the time the result of
values_available becomes determined and the time something waiting upon that
result runs.
values_available is useful when one wants to choose on values being available in a
pipe, so that one can be sure and not remove values and drop them on the floor.
values_available is roughly equivalent to read_at_most ~num_values:0.
Each of the sequence functions (fold, iter, transfer, map) comes in two versions:
"scalar" and "batch" processing. The scalar version has the ordinary type for f,
which handles an element at a time in a non-deferred way. In the batch version, f
deals with a queue of elements from the pipe at a time, and can block, which will
cause pushback on writers due to elements not being consumed.
Some functions (transfer, map, filter_map, filter, interleave, concat, and
their primed, batch-processing variants) spawn a background task that copies data from
some upstream pipe to some downstream pipe, perhaps with some processing inserted
in-between. These copying tasks finish under two circumstances. The standard,
"normal" case is when the copying task gets EOF from the upstream pipe -- there is no
more data to copy. In this case, the copying task closes the downstream pipe and
exits.
Somewhat less common is when the downstream consumer decides to stop reading early, while the upstream producer is still sending data to the copy task. (E.g., perhaps the consumer was searching its incoming stream for some value, and it found that value, so there's no need to search further.) In this case, the consumer closes its pipe to indicate it's done reading values. When the copy task discovers that its downstream pipe is closed, it propagate the close to the upstream producer by closing its pipe and stops processing.
type('a, 'b, 'c, 'accum)fold =?consumer:Consumer.t ->
'a Reader.t ->
init:'accum -> f:('accum -> 'b -> 'c) -> 'accum Deferred.t
fold' reader ~init ~f reads a batch of elements from reader, supplies them to f,
waits for f to finish, and then repeats. fold' finishes when the call to f on
the final batch of elements from reader finishes.
fold reader ~init ~f folds over the elements of reader, consuming them as they
come in. fold finishes when the final call to f returns.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
val fold' : ('a, 'a Core.Std.Queue.t, 'accum Deferred.t, 'accum) foldval fold : ('a, 'a, 'accum, 'accum) foldtype('a, 'b, 'c)iter =?consumer:Consumer.t ->
?continue_on_error:bool ->
'a Reader.t -> f:('b -> 'c) -> unit Deferred.t
iter' reader ~f repeatedly applies f to batches of elements of reader, waiting
for each call to f to finish before continuing. The deferred returned by iter'
becomes determined when the call to f on the final batch of elements finishes.
iter is a specialization of iter' that applies the supplied f to each element in
the batch, waiting for one call to f to finish before making the next call to f.
iter_without_pushback is a specialized version that applies f to each element
that arrives on the pipe, without giving f a chance to pushback on the iteration
continuing.
Supplying ~continue_on_error:true causes the iteration to continue even if f
raises.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
val iter' : ('a, 'a Core.Std.Queue.t, unit Deferred.t) iterval iter : ('a, 'a, unit Deferred.t) iterval iter_without_pushback : ('a, 'a, unit) iterval transfer' : 'a Reader.t ->
'b Writer.t ->
f:('a Core.Std.Queue.t -> 'b Core.Std.Queue.t Deferred.t) -> unit Deferred.ttransfer' input output ~f ?stop repeatedly reads a batch of elements from input,
applies f to the batch, writes the result as a batch to output, and then waits on
pushback in output before continuing. transfer' finishes if input is closed,
or output is closed, or stop is determined. If output is closed, then
transfer' closes input.
transfer is a specialization of transfer' that uses Queue.map ~f.
transfer_id is a specialization of transfer' wifh f = Fn.id.
val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Deferred.tval transfer_id : 'a Reader.t -> 'a Writer.t -> unit Deferred.tval map' : 'a Reader.t ->
f:('a Core.Std.Queue.t -> 'b Core.Std.Queue.t Deferred.t) -> 'b Reader.tmap' input ~f returns a reader, output, and repeatedly applies f to batches of
elements from input, with the results appearing in output. If values are not
being consumed from output, map' will pushback and stop consuming values from
input.
If output is closed, then map' will close input.
map is a specialization of map' that uses Queue.map ~f.
val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.tval filter_map' : 'a Reader.t -> f:('a -> 'b option Deferred.t) -> 'b Reader.tfilter_map' input ~f returns a reader, output, and repeatedly applies f to
elements from input, with the results that aren't None appearing in output. If
values are not being consumed from output, filter_map' will pushback and stop
consuming values from input.
If output is closed, then filter_map' will close input.
filter_map is a specialized version of filter_map'.
val filter_map : 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.tval filter : 'a Reader.t -> f:('a -> bool) -> 'a Reader.tfilter input ~f returns a reader, output, and copies to output each element from
input that satisfies the predicate f. If output is closed, then filter closes
input.val interleave : 'a Reader.t list -> 'a Reader.tinterleave inputs returns a reader output, and, for each input, transfers batches
of values from that input to output, using transfer_id. Each input is transferred
to output independently. So, batches of values from different inputs can be in
flight to output simultaneously, but at most one batch at a time from any particular
input. The operation is complete when either all the inputs produce EOF, or when
output is closed by the downstream consumer (in which case interleave closes all
the inputs).val concat : 'a Reader.t list -> 'a Reader.tconcat inputs return a reader, output, with the values from each pipe in inputs
in sequence. concat closes output once it reaches EOF on the final input.
If output is closed, then concat closes all its inputs.val to_stream_deprecated : 'a Reader.t -> 'a Async_stream.tto_stream_deprecated reader returns a stream that reads everything from the pipe.
This function is deprecated because one should change the code that is consuming
a stream to instead consume from a pipe reader.val of_stream_deprecated : 'a Async_stream.t -> 'a Reader.tof_stream_deprecated reader return a pipe that has one element for every element on
the stream. This function is deprecated because one should change the code that is
producing a stream to instead produce a pipe reader.val drain : 'a Reader.t -> unit Deferred.tdrain reader repeatedly reads values from reader and throws them away.
drain_and_count is like drain, except it also counts the number of values it
has read.
val drain_and_count : 'a Reader.t -> int Deferred.tval to_list : 'a Reader.t -> 'a list Deferred.tto_list input reads everything from input; on EOF, it produces the accumulated
list of these values.val hash : ('a, 'b) t -> inthash a hash function suitable for pipesval equal : ('a, 'b) t -> ('a, 'b) t -> boolequal on pipes is physical equality.val size_budget : ('a, 'b) t -> int
Every pipe's initial size budget is zero.
val set_size_budget : ('a, 'b) t -> int -> unitset_size_budget t i changes the size budget of t to i. Any nonnegative value is
allowed.val show_debug_messages : bool Pervasives.refshow_debug_messages, if true will cause a message to be printed at the start of each
operation, showing the pipe and other arguments.val check_invariant : bool Pervasives.refcheck_invariant, if true, will cause pipes' invariants to be checked at the start of
each operation.val sexp_of_t : ('a -> Sexplib.Sexp.t) ->
('phantom -> Sexplib.Sexp.t) -> ('a, 'phantom) t -> Sexplib.Sexp.tval sexp_of_pipe : ('a -> Sexplib.Sexp.t) ->
('phantom -> Sexplib.Sexp.t) -> ('a, 'phantom) pipe -> Sexplib.Sexp.tcreate () creates a new pipe.init f creates a new pipe, applies f to its writer end, and returns its reader
end. init closes the writer end when the result of f becomes determined. If f
raises, the writer end is closed and the exception is raised to the caller of
init.of_list l returns a closed pipe reader filled with the contents of l.close t closes the write end of the pipe:
`Eof.`Eof.Close is idempotent.
close_read t closes both the read and write ends of the pipe. It does everything
close does, and in addition:
`Reader_closed.`Eofis_closed t returns true iff close t or close_read t has been called.closed t returns a deferred that becomes determined when close t or close_read t
is called.upstream_flushed and downstream_flushed becomes determined
when all values written prior to the call have been consumed, or if the reader end of
the pipe is closed. The difference between "upstream" and "downstream" comes if one
has a chain of pipes that are linked (e.g. by Pipe.map):
P1 --> P2 --> P3
Calling downstream_flushed P2 ensures that everything in P2 has made it out of P3.
Calling upstream_flushed P2 ensures that everything in P1 has made it out of P3.
More generally, downstream_flushed starts at the current pipe and follows the chain
to the final downstream consumer(s). upstream_flushed follows the chain to the
initial upstream pipe(s), and then calls downstream_flushed.
For a pipe in isolation, "consumed" means "read from the pipe". However, for pipes
linked together with transfer or any function built from transfer, "consumed"
means "propagated all the way downstream through the chain and read from the final
pipe in the chain". Furthermore, for a pipe ultimately connected to an
Async.Writer, "consumed" means the OS write() system call has completed on the bytes
read from the final pipe in the chain.
The following Pipe functions automatically link their input and output pipes
together so that flushed on upstream pipes will propagate to downstream pipes:
transfer*, map*, filter_map*, filter, interleave, concat. There is *not*
automatic linking with iter*; however, user code can customize the behavior of
flush functions using Consumer, see below.
A Consumer is used to augment our notion of flushing (Pipe.flushed) to include
the time spent processing an element once it has been removed from the pipe. It can
be thought of as sitting at the end of a pipe, or between two pipes, and it provides
more detailed feedback on the time an element spends outside of the pipe proper.
So we have the following two cases:
Pipe --> Consumer
Pipe --> Consumer --> Pipe --> ...
The time outside of the pipe can be broken down into two parts: a part (probably short lived) during which the consumer processes the elements in some way, and a downstream portion where the consumer acts as a sentinal to report when the element has been fully processed.
For instance, consider the simple case of a pipe attached to an Async.Std.Writer
that is writing elements to disk. Part one would be whatever transform the consumer
applies to the elements in the pipe before it hands them off to the writer, and part
two would be waiting for the writer to finish writing the transformed element to
disk. A more complex case is chaining two pipes together (maybe with a transform
like map). Part one in this case is the transform and the write to the downstream
pipe, and part two is waiting for that pipe (and any further pipes in the chain) to
flush.
In each case the consumer is responsible for indicating when:
read and read') and calling values_sent_downstream when it has
sent the values downstream.~downstream_flushed when add_consumer is called).read' but are processed over a long
period.add_consumer reader ~downstream_flushed creates a new consumer of reader, and
causes future calls to flushed_downstream reader to take this consumer into account.
Thereafter, Pipe.flushed_downstream reader will first ensure that values previously
written to reader have been read, then that they have been sent downstream by the
consumer that read them, and finally that they have been flushed downstream.(_, _) t, that is, both readers and
writers.length t returns the number of elements currently queued in tis_empty t is true iff there are no values in the pipe.
Producers that write a sequence of values to a pipe should be aware that the consumers
who read from the pipe can close the pipe early -- that is, before the producer has
finished doing all of its writes. If this happens, further writes will raise an
exception. To avoid these errors, all writes must be atomically guarded by
is_closed tests. Thus, a typical writer loop should look like this:
fun countup hi w = (* Send the ints in range [0,hi) to writer W. *)
let rec loop i =
if i < hi and not (is_closed w) then (* Guard write w/closed test. *)
write i w >>> (* Do the write then block until datum *)
fun () -> loop (i+1) (* fits or the pipe is closed. *)
else close w (* No harm done if reader has already closed the pipe.*)
in
loop 0
If the pipe's consumer stops reading early and closes the pipe, countup won't error
out trying to write further values down the pipe: it will immediately wake up and
exit.
pushback writer becomes determined when either writer has been closed or
the pipe is empty.
write' writer q transfers the elements from q into the pipe, leaving q empty.
write' returns a pushback deferred, as described above. Writing to a closed pipe
raises.
write writer v is equivalent to write' writer (Queue.singleton v).
write_without_pushback' and write_without_pushback are alternatives to write'
and write that can be used when you don't care about the resultant deferred. The
data is added to the pipe and then we return immediately.
write' t values is equivalent to write_without_pushback' t values; pushback t (and
similarly for write).
write_when_ready writer ~f waits until there is space available in the pipe, and
then calls f write, where write can be used by f to write a single value into
the pipe at a time. write_when_ready guarantees that the pipe is open when it calls
f, and hence that the writes will succeed, unless f itself closes the pipe.
The forward-progress semantics means that every call produces some data, so you can process an n-element input with at most n reads; you cannot burn an unbounded number of cycles "spinning" doing an unbounded number of empty-result "polling" calls (which, in a non-preemptive system like Async could lock up the process).
The two exceptions to best-effort/forward-progress semantics are read_now, which
polls for data, thus abandoning the forward-progress guarantee, and read_exactly,
which loops until it has read the entire amount requested (or encountered EOF), thus
abandoning the best-effort guarantee of timeliness.
read' pipe reads all of the values available in the pipe, as soon as any value
becomes available. The resulting queue will satisfy Q.length q > 0. The consumer
is used to extend the meaning of values being flushed (see the Consumer module
above).
read pipe reads a single value from the pipe. The consumer is used to extend the
meaning of values being flushed (see the Consumer module above).
read_at_most r ~num_values reads up to num_values values from the pipe's currently
available data, blocking if the pipe is empty. The resulting queue will satisfy 0 <
Queue.length q <= num_values. read_at_most raises if num_values <= 0. The
consumer is used to extend the meaning of values being flushed (see the Consumer
module above).
read_exactly r ~num_values reads exactly num_values items, unless EOF is
encountered. read_exactly performs a sequence of read_at_most operations, so
there is no guarantee that the queue of values it returns comprise a contiguous
segment of the written stream of values -- other readers might pick off elements
in-between read_exactly's atomic reads. read_exactly raises if num_values <= 0.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
read_now' reader reads all of the values from reader that are immediately
available. The resulting queue will satisfy Q.length q > 0. If reader is closed,
read_now' returns `Eof. If reader is empty, read_now' returns
`Nothing_available. read_now' has the danger of permitting the computation to
"spin" doing empty reads; it is only useful in exotic circumstances. The consumer
is used to extend the meaning of values being flushed (see the Consumer module
above).
read_now is like read_now', except that it reads a single value rather than
everything that is available.
clear reader consumes all of the values currently in reader, and all blocked
flushes become determined with `Ok.
read_all reader reads all the values from the pipe until it is closed. An
alternative name might be Reader.to_queue.
values_available reader returns a deferred that becomes determined when there are
values in the pipe. If there are multiple readers (a rare situation), there is no
guarantee that some other reader hasn't become active because of ordinary async
scheduling and removed some or all of the values between the time the result of
values_available becomes determined and the time something waiting upon that
result runs.
values_available is useful when one wants to choose on values being available in a
pipe, so that one can be sure and not remove values and drop them on the floor.
values_available is roughly equivalent to read_at_most ~num_values:0.
Each of the sequence functions (fold, iter, transfer, map) comes in two versions:
"scalar" and "batch" processing. The scalar version has the ordinary type for f,
which handles an element at a time in a non-deferred way. In the batch version, f
deals with a queue of elements from the pipe at a time, and can block, which will
cause pushback on writers due to elements not being consumed.
Some functions (transfer, map, filter_map, filter, interleave, concat, and
their primed, batch-processing variants) spawn a background task that copies data from
some upstream pipe to some downstream pipe, perhaps with some processing inserted
in-between. These copying tasks finish under two circumstances. The standard,
"normal" case is when the copying task gets EOF from the upstream pipe -- there is no
more data to copy. In this case, the copying task closes the downstream pipe and
exits.
Somewhat less common is when the downstream consumer decides to stop reading early, while the upstream producer is still sending data to the copy task. (E.g., perhaps the consumer was searching its incoming stream for some value, and it found that value, so there's no need to search further.) In this case, the consumer closes its pipe to indicate it's done reading values. When the copy task discovers that its downstream pipe is closed, it propagate the close to the upstream producer by closing its pipe and stops processing.
fold' reader ~init ~f reads a batch of elements from reader, supplies them to f,
waits for f to finish, and then repeats. fold' finishes when the call to f on
the final batch of elements from reader finishes.
fold reader ~init ~f folds over the elements of reader, consuming them as they
come in. fold finishes when the final call to f returns.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
iter' reader ~f repeatedly applies f to batches of elements of reader, waiting
for each call to f to finish before continuing. The deferred returned by iter'
becomes determined when the call to f on the final batch of elements finishes.
iter is a specialization of iter' that applies the supplied f to each element in
the batch, waiting for one call to f to finish before making the next call to f.
iter_without_pushback is a specialized version that applies f to each element
that arrives on the pipe, without giving f a chance to pushback on the iteration
continuing.
Supplying ~continue_on_error:true causes the iteration to continue even if f
raises.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
transfer' input output ~f ?stop repeatedly reads a batch of elements from input,
applies f to the batch, writes the result as a batch to output, and then waits on
pushback in output before continuing. transfer' finishes if input is closed,
or output is closed, or stop is determined. If output is closed, then
transfer' closes input.
transfer is a specialization of transfer' that uses Queue.map ~f.
transfer_id is a specialization of transfer' wifh f = Fn.id.
map' input ~f returns a reader, output, and repeatedly applies f to batches of
elements from input, with the results appearing in output. If values are not
being consumed from output, map' will pushback and stop consuming values from
input.
If output is closed, then map' will close input.
map is a specialization of map' that uses Queue.map ~f.
filter_map' input ~f returns a reader, output, and repeatedly applies f to
elements from input, with the results that aren't None appearing in output. If
values are not being consumed from output, filter_map' will pushback and stop
consuming values from input.
If output is closed, then filter_map' will close input.
filter_map is a specialized version of filter_map'.
filter input ~f returns a reader, output, and copies to output each element from
input that satisfies the predicate f. If output is closed, then filter closes
input.
interleave inputs returns a reader output, and, for each input, transfers batches
of values from that input to output, using transfer_id. Each input is transferred
to output independently. So, batches of values from different inputs can be in
flight to output simultaneously, but at most one batch at a time from any particular
input. The operation is complete when either all the inputs produce EOF, or when
output is closed by the downstream consumer (in which case interleave closes all
the inputs).
concat inputs return a reader, output, with the values from each pipe in inputs
in sequence. concat closes output once it reaches EOF on the final input.
If output is closed, then concat closes all its inputs.
to_stream_deprecated reader returns a stream that reads everything from the pipe.
This function is deprecated because one should change the code that is consuming
a stream to instead consume from a pipe reader.
of_stream_deprecated reader return a pipe that has one element for every element on
the stream. This function is deprecated because one should change the code that is
producing a stream to instead produce a pipe reader.
drain reader repeatedly reads values from reader and throws them away.
drain_and_count is like drain, except it also counts the number of values it
has read.
to_list input reads everything from input; on EOF, it produces the accumulated
list of these values.
hash a hash function suitable for pipesequal on pipes is physical equality.
Every pipe's initial size budget is zero.
set_size_budget t i changes the size budget of t to i. Any nonnegative value is
allowed.
show_debug_messages, if true will cause a message to be printed at the start of each
operation, showing the pipe and other arguments.check_invariant, if true, will cause pipes' invariants to be checked at the start of
each operation.