These provide reader- and writer-specific types for the base pipe type.
init f creates a new pipe, applies
f to its writer end, and returns its reader
init closes the writer end when the result of
f becomes determined. If
raises, the writer end is closed and the exception is raised to the caller of
init_reader is symmetric. It creates a new pipe, applies
f to its reader end, and
returns its writer end.
close_read when the result of
determined or if
f raises, and any exception is raised to the caller of
of_list l returns a closed pipe reader filled with the contents of
unfold ~init ~f returns a pipe that it fills with
'as by repeatedly applying
to values of the state type
None, the resulting pipe is
unfold respects pushback on the resulting pipe.
For example, to create a pipe of natural numbers:
Pipe.unfold ~init:0 ~f:(fun n -> return (Some (n, n+1)))
to_sequence reader returns a sequence that can be consumed to extract values from
Wait_for d is returned the consumer must wait for
d to become
determined before pulling the next value. Repeatedly asking for the next value
without waiting on
d will infinite loop.
close t closes the write end of the pipe:
Thus, after a pipe has been closed, reads never block.
Close is idempotent.
close_read t closes both the read and write ends of the pipe. It does everything
close does, and in addition:
is_closed t returns
close t or
close_read t has been called.
Deferreds returned by
downstream_flushed become determined
when all values written prior to the call have been consumed, or if the reader end of
the pipe is closed. The difference between "upstream" and "downstream" comes if one
has a chain of pipes that are linked (e.g. by
P1 --> P2 --> P3
downstream_flushed P2 ensures that everything in P2 has made it out of P3.
upstream_flushed P2 ensures that everything in P1 has made it out of P3.
downstream_flushed starts at the current pipe and follows the chain
to the final downstream consumer(s).
upstream_flushed follows the chain to the
initial upstream pipe(s), and then calls
For a pipe in isolation, "consumed" means "read from the pipe". However, for pipes
linked together with
transfer or any function built from
means "propagated all the way downstream through the chain and read from the final
pipe in the chain". Furthermore, for a pipe ultimately connected to an
Async.Writer, "consumed" means the OS write() system call has completed on the bytes
read from the final pipe in the chain.
Pipe functions automatically link their input and output pipes
together so that
*_flushed on upstream pipes will propagate to downstream pipes:
concat. There is *not*
automatic linking with
iter*; however, user code can customize the behavior of
flush functions using
Consumer, see below.
add_consumer reader ~downstream_flushed creates a new consumer of
causes future calls to
flushed_downstream reader to take this consumer into account.
Pipe.flushed_downstream reader will first ensure that values previously
reader have been read, then that they have been sent downstream by the
consumer that read them, and finally that they have been flushed downstream.
One should only supply the resulting consumer to read operations on
a consumer created from one reader with another reader will raise an exception.
These operations apply to all values of type
(_, _) t, that is, both readers and
length t returns the number of elements currently queued in
is_empty t is true iff there are no values in the pipe.
The write operations return a deferred value that is determined when either (1) it is OK to write again to the pipe or (2) the pipe has been closed. This deferred is the data-producer's interface to the pipe pushback mechanism: it tells the producer when it should proceed after doing a write -- either to produce and write more data to the pipe, or to abandon production entirely. The pushback mechansim is just advisory: a producer task can, but typically should not, dump arbitrary amounts of data into a pipe even if there is no consumer draining the pipe.
Producers that write a sequence of values to a pipe should be aware that the consumers
who read from the pipe can close the pipe early -- that is, before the producer has
finished doing all of its writes. If this happens, further writes will raise an
exception. To avoid these errors, all writes must be atomically guarded by
is_closed tests. Thus, a typical writer loop should look like this:
fun countup hi w = (* Send the ints in range [0,hi) to writer W. *) let rec loop i = if i < hi and not (is_closed w) then (* Guard write w/closed test. *) write i w >>> (* Do the write then block until datum *) fun () -> loop (i+1) (* fits or the pipe is closed. *) else close w (* No harm done if reader has already closed the pipe.*) in loop 0
If the pipe's consumer stops reading early and closes the pipe,
countup won't error
out trying to write further values down the pipe: it will immediately wake up and
write writer a enqueues
writer, returning a pushback deferred, as described
transfer_in writer ~from:q transfers the elements from
q empty, and returning a pushback deferred.
transfer_in_without_pushback are alternatives to
write that can be used when you don't care about the pushback
deferred. They add data to the pipe and return immediately.
The following equivalences hold:
write t a = write_without_pushback t a; pushback t
transfer_in t ~from = transfer_in_without_pushback t ~from; pushback t
is_closed writer, then all of these functions raise.
write_when_ready writer ~f waits until there is space available in the pipe, and
f write, where
write can be used by
f to write a single value into
the pipe at a time.
write_when_ready guarantees that the pipe is open when it calls
f, and hence that the writes will succeed, unless
f itself closes the pipe.
write_if_open w e is equivalent to:
let x = e in if not (is_closed w) then write w x else Deferred.unit
Note the difference in allocation and potential side effects when
w is closed and
e is a complex expression.
write_without_pushback_if_open is the same as
write_if_open, except it calls
write_without_pushback instead of
With two special exceptions, all read procedures have a best-effort/forward-progress semantics:
The best-effort semantics allows you to program in a style that processes data in big slabs, yet also moves data through your processing in as timely a way as possible.
The forward-progress semantics means that every call produces some data, so you can process an n-element input with at most n reads; you cannot burn an unbounded number of cycles "spinning" doing an unbounded number of empty-result "polling" calls (which, in a non-preemptive system like Async could lock up the process).
The two exceptions to best-effort/forward-progress semantics are
polls for data, thus abandoning the forward-progress guarantee, and
which loops until it has read the entire amount requested (or encountered EOF), thus
abandoning the best-effort guarantee of timeliness.
read' pipe reads values available in the pipe, as soon as any value becomes
available. The resulting queue will satisfy
0 < Queue.length q <= max_queue_length.
read' raises if
max_queue_length <= 0. The
consumer is used to extend the
meaning of values being flushed (see the
Consumer module above).
read_exactly r ~num_values reads exactly
num_values items, unless EOF is
read_exactly performs a sequence of
read_at_most operations, so
there is no guarantee that the queue of values it returns comprise a contiguous
segment of the written stream of values -- other readers might pick off elements
read_exactly's atomic reads.
read_exactly raises if
num_values <= 0.
consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
read_now' reader reads values from
reader that are immediately available. The
resulting queue will satisfy
0 <= Q.length q <= max_queue_length. If
reader is empty,
consumer is used to extend the meaning of values being
flushed (see the
Consumer module above).
clear reader consumes all of the values currently in
reader, and all blocked
flushes become determined with
values_available reader returns a deferred that becomes determined when there are
values in the pipe. If there are multiple readers (a rare situation), there is no
guarantee that some other reader hasn't become active because of ordinary Async
scheduling and removed some or all of the values between the time the result of
values_available becomes determined and the time something waiting
values_available is useful when one wants to
choose on values being available in a
pipe, so that one can be sure and not remove values and drop them on the floor.
values_available is roughly equivalent to
f, which handles an element at a time in a non-deferred way. In the batch version,
fdeals with a queue of elements from the pipe at a time, and can block, which will cause pushback on writers due to elements not being consumed.
concat, and their primed, batch-processing variants) spawn a background task that copies data from some upstream pipe to some downstream pipe, perhaps with some processing inserted in-between. These copying tasks finish under two circumstances. The standard, "normal" case is when the copying task gets EOF from the upstream pipe -- there is no more data to copy. In this case, the copying task closes the downstream pipe, if necessary, and exits.
fold' reader ~init ~f reads a batch of elements from
reader, supplies them to
f to finish, and then repeats.
fold' finishes when the call to
the final batch of elements from
iter' reader ~f repeatedly applies
f to batches of elements of
for each call to
f to finish before continuing. The deferred returned by
becomes determined when the call to
f on the final batch of elements finishes.
~continue_on_error:true causes the iteration to continue even if
~consumer is used to extend the meaning of values being flushed (see the
iter_without_pushback t ~f applies
f to each element in
t, without giving
chance to pushback on the iteration continuing. If
f raises on some element of
iter_without_pushback will not consume any further elements.
iter_without_pushback will not make more than
max_iterations_per_job calls to
in a single Async_job; this can be used to increase Async-scheduling fairness.
transfer' input output ~f repeatedly reads a batch of elements from
f to the batch, writes the result as a batch to
output, and then waits on
output before continuing.
transfer' finishes if
input is closed
output is closed. If
output is closed, then
map' input ~f returns a reader,
output, and repeatedly applies
f to batches of
input, with the results appearing in
output. If values are not
being consumed from
map' will pushback and stop consuming values from
output is closed, then
map' will close
filter_map' input ~f returns a reader,
output, and repeatedly applies
input, with the results that aren't
None appearing in
values are not being consumed from
filter_map' will pushback and stop
consuming values from
output is closed, then
filter_map' will close
interleave inputs returns a reader
output, and, for each input, transfers batches
of values from that input to
transfer_id. Each input is transferred
output independently. So, batches of values from different inputs can be in
output simultaneously, but at most one batch at a time from any particular
input. The operation is complete when either all the
inputs produce EOF, or when
output is closed by the downstream consumer (in which case
interleave closes all
merge inputs ~cmp returns a reader,
output, that merges all the inputs. Assuming
that for each input, values are sorted according to the comparison function
values for each input will be transfered to
output and the values returned by
output will be sorted according to
hash a hash function suitable for pipes
Every pipe has a "size budget", which governs the pushback that is used to discourage writers from enqueueing arbitrarily large amounts of data. As long as the length of the pipe exceeds the size budget, writers will not be notified to do further writing. Whenever the length is less than or equal to the size budget, writers will be notified to continue.
Every pipe's initial size budget is zero.
set_size_budget t i changes the size budget of
i. Any nonnegative value is
show_debug_messages, if true will cause a message to be printed at the start of each
operation, showing the pipe and other arguments.
check_invariant, if true, will cause pipes' invariants to be checked at the start of