A buffered FIFO communication channel.
A pipe has a "writer" end and a "reader" end. The intent is that a writer feeds values into the pipe and then waits until it is notified that it should put more data in (referred to as "pushback").
Each pipe contains a buffer that is a queue of values that have been written to the
pipe but not yet read from the pipe. The length of the queue is not bounded; whenever
the pipe is written to, values are immediately enqueued. However, writers are
supposed to respect pushback from readers, either via the unit Deferred.t returned
by write calls or by explicitly calling pushback.
If a pipe is empty, then readers queue up, waiting for values to be written. As soon as values are written, if a reader is available to consume them, the values will be handed to the reader.
One can use downstream_flushed to get notified by a pipe when all prior writes have
been consumed by a reader.
There are distinct Reader and Writer modules and types, but all of the operations
on readers and writers are available directly from the Pipe module.
include sig ... endval sexp_of_t : ('a ‑> Base.Sexp.t) ‑> ('phantom ‑> Base.Sexp.t) ‑> ('a, 'phantom) t ‑> Base.Sexp.tinclude sig ... endval sexp_of_pipe : ('a ‑> Base.Sexp.t) ‑> ('phantom ‑> Base.Sexp.t) ‑> ('a, 'phantom) pipe ‑> Base.Sexp.tThese provide reader- and writer-specific types for the base pipe type.
module Writer : sig ... endmodule Reader : sig ... endval create_reader : close_on_exception:bool ‑> ('a Writer.t ‑> unit Async_kernel.Deferred.t) ‑> 'a Reader.tcreate_reader ~close_on_exception f creates a new pipe, applies f to its writer
end, and returns its reader end. create_reader closes the writer end when the
result of f becomes determined. If f raises, then the exception is raised
to the caller of create_reader. Whether or not create_reader closes the writer
end upon f raising is determined by close_on_exception.
Choosing ~close_on_exception:false is recommended, because normally closing the
write end of a pipe is taken to mean that the writer completed successfully. With
close_on_exception:true, the caller will both see the pipe closed and an exception
will be raised to the monitor in effect when create_reader was called. There is a
race between those two actions, which can easily lead to confusion or bugs.
val create_writer : ('a Reader.t ‑> unit Async_kernel.Deferred.t) ‑> 'a Writer.tcreate_writer is symmetric with create_reader. It creates a new
pipe, applies f to its reader end, and returns its writer end. create_writer
calls close_read when the result of f becomes determined. If f raises,
create_writer closes the pipe and raises the exception to the caller of
create_writer. create_writer closes on exception, unlike create_reader, because
closing closing the read end of a pipe is a signal to the writer that the consumer has
failed.
val init : ('a Writer.t ‑> unit Async_kernel.Deferred.t) ‑> 'a Reader.tcreate () creates a new pipe. It is preferable to use create_reader or
create_writer instead of create, since they provide exception handling and
automatic closing of the pipe.
val of_list : 'a list ‑> 'a Reader.tof_list l returns a closed pipe reader filled with the contents of l.
val singleton : 'a ‑> 'a Reader.tsingleton x returns a closed pipe reader filled with the single value x.
val unfold : init:'s ‑> f:('s ‑> ('a * 's) option Async_kernel.Deferred.t) ‑> 'a Reader.tunfold ~init ~f returns a pipe that it fills with 'as by repeatedly applying f
to values of the state type 's. When f returns None, the resulting pipe is
closed. unfold respects pushback on the resulting pipe. If f raises, then the
pipe is not closed.
For example, to create a pipe of natural numbers:
Pipe.unfold ~init:0 ~f:(fun n -> return (Some (n, n+1)))val of_sequence : 'a Core_kernel.Sequence.t ‑> 'a Reader.tof_sequence sequence returns a pipe reader that gets filled with the elements of
sequence. of_sequence respects pushback on the resulting pipe.
to_sequence reader returns a sequence that can be consumed to extract values from
reader. If Wait_for d is returned, the consumer must wait for d to become
determined before pulling the next value. Repeatedly asking for the next value
without waiting on d will infinite loop.
val to_sequence : 'a Reader.t ‑> 'a to_sequence_elt Core_kernel.Sequence.tval close : _ Writer.t ‑> unitclose t closes the write end of the pipe:
`Eof.`Eof.Thus, after a pipe has been closed, reads never block.
close is idempotent.
val close_read : _ Reader.t ‑> unitclose_read t closes both the read and write ends of the pipe. It does everything
close does, and in addition:
`Reader_closed.`Eof.val is_closed : (_, _) t ‑> boolis_closed t returns true iff close t or close_read t has been called.
val closed : (_, _) t ‑> unit Async_kernel.Deferred.tclosed t returns a deferred that becomes determined when close t or close_read t
is called.
module Flushed_result : sig ... endDeferreds returned by upstream_flushed and downstream_flushed become determined
when all values written prior to the call have been consumed, or if the reader end of
the pipe is closed. The difference between "upstream" and "downstream" comes if one
has a chain of pipes that are linked (e.g., by Pipe.map):
P1 --> P2 --> P3
Calling downstream_flushed P2 ensures that everything in P2 has made it out of P3.
Calling upstream_flushed P2 ensures that everything in P1 has made it out of P3.
More generally, downstream_flushed starts at the current pipe and follows the chain
to the final downstream consumer(s). upstream_flushed follows the chain to the
initial upstream pipe(s), and then calls downstream_flushed.
For a pipe in isolation, "consumed" means "read from the pipe". However, for pipes
linked together with transfer or any function built from transfer, "consumed"
means "propagated all the way downstream through the chain and read from the final
pipe in the chain". Furthermore, for a pipe ultimately connected to an
Async.Writer, "consumed" means the OS write() system call has completed on the bytes
read from the final pipe in the chain.
The following Pipe functions automatically link their input and output pipes
together so that *_flushed on upstream pipes will propagate to downstream pipes:
transfer*, map*, filter_map*, filter, interleave, concat. There is not automatic linking with iter*; however, user code can customize the behavior of
flush functions using Consumer.
val upstream_flushed : (_, _) t ‑> Flushed_result.t Async_kernel.Deferred.tval downstream_flushed : (_, _) t ‑> Flushed_result.t Async_kernel.Deferred.tmodule Consumer : sig ... endval add_consumer : _ Reader.t ‑> downstream_flushed:(unit ‑> Flushed_result.t Async_kernel.Deferred.t) ‑> Consumer.tadd_consumer reader ~downstream_flushed creates a new consumer of reader, and
causes future calls to flushed_downstream reader to take this consumer into account.
Thereafter, Pipe.flushed_downstream reader will first ensure that values previously
written to reader have been read, then that they have been sent downstream by the
consumer that read them, and finally that they have been flushed downstream.
One should only supply the resulting consumer to read operations on reader. Using
a consumer created from one reader with another reader will raise an exception.
These operations apply to all values of type (_, _) t, that is, both readers and
writers.
The write operations return a deferred value that is determined when either (1) it is OK to write again to the pipe or (2) the pipe has been closed. This deferred is the data-producer's interface to the pipe pushback mechanism: it tells the producer when it should proceed after doing a write -- either to produce and write more data to the pipe, or to abandon production entirely. The pushback mechanism is just advisory: a producer task can, but typically should not, dump arbitrary amounts of data into a pipe even if there is no consumer draining it.
Producers that write a sequence of values to a pipe should be aware that the consumers
who read from the pipe can close the pipe early -- that is, before the producer has
finished doing all of its writes. If this happens, further writes will raise an
exception. To avoid these errors, all writes must be atomically guarded by
is_closed tests. Thus, a typical writer loop should look like this:
fun countup hi w = (* Send the ints in range [0,hi) to writer W. *)
let rec loop i =
if i < hi and not (is_closed w) then (* Guard write w/closed test. *)
write i w >>> (* Do the write then block until datum *)
fun () -> loop (i+1) (* fits or the pipe is closed. *)
else close w (* No harm done if reader has already closed the pipe.*)
in
loop 0If the pipe's consumer stops reading early and closes the pipe, countup won't error
out trying to write further values down the pipe: it will immediately wake up and
exit.
val pushback : 'a Writer.t ‑> unit Async_kernel.Deferred.tpushback writer becomes determined when either writer has been closed or
the pipe can accept a new write.
val write : 'a Writer.t ‑> 'a ‑> unit Async_kernel.Deferred.twrite writer a enqueues a in writer, returning a pushback deferred, as described
above.
transfer_in writer ~from:q transfers the elements from q into writer, leaving
q empty, and returning a pushback deferred.
write_without_pushback and transfer_in_without_pushback are alternatives to
transfer_in and write that can be used when you don't care about the pushback
deferred. They add data to the pipe and return immediately.
The following equivalences hold:
write t a = write_without_pushback t a; pushback ttransfer_in t ~from = transfer_in_without_pushback t ~from; pushback tIf is_closed writer, then all of these functions raise.
val write_without_pushback : 'a Writer.t ‑> 'a ‑> unitval transfer_in : 'a Writer.t ‑> from:'a Core_kernel.Queue.t ‑> unit Async_kernel.Deferred.tval transfer_in_without_pushback : 'a Writer.t ‑> from:'a Core_kernel.Queue.t ‑> unitval write_when_ready : 'a Writer.t ‑> f:(('a ‑> unit) ‑> 'b) ‑> [ `Closed | `Ok of 'b ] Async_kernel.Deferred.twrite_when_ready writer ~f waits until there is space available in the pipe, and
then calls f write, where write can be used by f to write a single value into
the pipe at a time. write_when_ready guarantees that the pipe is open when it calls
f, and hence that the writes will succeed, unless f itself closes the pipe.
val write_if_open : 'a Writer.t ‑> 'a ‑> unit Async_kernel.Deferred.twrite_if_open w e is equivalent to:
let x = e in
if not (is_closed w) then (write w x) else (return ())Note the difference in allocation and potential side effects when w is closed and
e is a complex expression.
write_without_pushback_if_open is the same as write_if_open, except it calls
write_without_pushback instead of write.
val write_without_pushback_if_open : 'a Writer.t ‑> 'a ‑> unitWith two special exceptions, all read procedures have a best-effort/forward-progress semantics:
The best-effort semantics allows you to program in a style that processes data in big slabs, yet also moves data through your processing in as timely a way as possible.
The forward-progress semantics means that every call produces some data, so you can process an n-element input with at most n reads; you cannot burn an unbounded number of cycles "spinning" doing an unbounded number of empty-result "polling" calls (which, in a non-preemptive system like Async could lock up the process).
The two exceptions to best-effort/forward-progress semantics are read_now, which
polls for data, thus abandoning the forward-progress guarantee, and read_exactly,
which loops until it has read the entire amount requested (or encountered EOF), thus
abandoning the best-effort guarantee of timeliness.
val read' : ?consumer:Consumer.t ‑> ?max_queue_length:int ‑> 'a Reader.t ‑> [ `Eof | `Ok of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.tread' pipe reads values available in the pipe, as soon as any value becomes
available. The resulting queue will satisfy 0 < Queue.length q <= max_queue_length.
read' raises if max_queue_length <= 0. The consumer is used to extend the
meaning of values being flushed (see the Consumer module above).
val read : ?consumer:Consumer.t ‑> 'a Reader.t ‑> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.tread pipe reads a single value from the pipe. The consumer is used to extend the
meaning of values being flushed (see the Consumer module above).
val read_at_most : ?consumer:Consumer.t ‑> 'a Reader.t ‑> num_values:int ‑> [ `Eof | `Ok of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.tread_at_most t ~num_values is read' t ~max_queue_length:num_values.
val read_exactly : ?consumer:Consumer.t ‑> 'a Reader.t ‑> num_values:int ‑> [ `Eof | `Fewer of 'a Core_kernel.Queue.t | `Exactly of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.tread_exactly r ~num_values reads exactly num_values items, unless EOF is
encountered. read_exactly performs a sequence of read_at_most operations, so
there is no guarantee that the queue of values it returns comprise a contiguous
segment of the written stream of values -- other readers might pick off elements
in-between read_exactly's atomic reads. read_exactly raises if num_values <= 0.
The consumer is used to extend the meaning of values being flushed (see the
Consumer module above).
val read_now' : ?consumer:Consumer.t ‑> ?max_queue_length:int ‑> 'a Reader.t ‑> [ `Eof | `Nothing_available | `Ok of 'a Core_kernel.Queue.t ]read_now' reader reads values from reader that are immediately available. The
resulting queue will satisfy 0 <= Q.length q <= max_queue_length. If reader is
closed, read_now' returns `Eof. If reader is empty, read_now' returns
`Nothing_available. The consumer is used to extend the meaning of values being
flushed (see the Consumer module above).
val read_now : ?consumer:Consumer.t ‑> 'a Reader.t ‑> [ `Eof | `Nothing_available | `Ok of 'a ]read_now is like read_now', except that it reads a single value rather than
everything that is available.
val read_now_at_most : ?consumer:Consumer.t ‑> 'a Reader.t ‑> num_values:int ‑> [ `Eof | `Nothing_available | `Ok of 'a Core_kernel.Queue.t ]read_now_at_most t ~num_values is read_now' t ~max_queue_length:num_values
val peek : 'a Reader.t ‑> 'a optionval clear : 'a Reader.t ‑> unitclear reader consumes all of the values currently in reader, and all blocked
flushes become determined with `Ok.
val read_all : 'a Reader.t ‑> 'a Core_kernel.Queue.t Async_kernel.Deferred.tread_all reader reads all the values from the pipe until it is closed. An
alternative name might be Reader.to_queue.
val values_available : _ Reader.t ‑> [ `Eof | `Ok ] Async_kernel.Deferred.tvalues_available reader returns a deferred that becomes determined when there are
values in the pipe. If there are multiple readers (a rare situation), there is no
guarantee that some other reader hasn't become active because of ordinary Async
scheduling and removed some or all of the values between the time the result of
values_available becomes determined and the time something waiting upon that
result runs.
values_available is useful when one wants to choose on values being available in a
pipe, so that one can be sure and not remove values and drop them on the floor.
values_available is roughly equivalent to read' ~max_queue_length:0.
val read_choice : 'a Reader.t ‑> [ `Eof | `Ok of 'a | `Nothing_available ] Async_kernel.Deferred.choiceread_choice reader is:
choice
(values_available reader)
(fun (_ : [ `Ok | `Eof ]) -> read_now reader)read_choice consumes a value from reader iff the choice is taken. read_choice
exists to discourage the broken idiom:
choice (read reader) (fun ...)which is broken because it reads from reader even if the choice isn't taken.
`Nothing_available can only be returned if there is a race condition with one or
more other consumers.
read_choice_single_consumer_exn reader [%here] is like read_choice reader, but it
raises in the case of `Nothing_available. It is intended to be used when reader
has no other consumers.
val read_choice_single_consumer_exn : 'a Reader.t ‑> Core_kernel.Source_code_position.t ‑> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.choiceIssues:
Scalar & batch sequence processing:
Each of the sequence functions (fold, iter, transfer, map) comes in two
versions: "scalar" and "batch" processing. The scalar version has the ordinary type
for f, which handles an element at a time in a non-deferred way. In the batch
version, f deals with a queue of elements from the pipe at a time, and can block,
which will cause pushback on writers due to elements not being consumed.
Early-close and functions that copy between pipes:
Some functions (transfer, map, filter_map, filter, interleave, concat, and
their primed, batch-processing variants) spawn a background task that copies data from
some upstream pipe to some downstream pipe, perhaps with some processing inserted
in between. These copying tasks finish under two circumstances. The standard,
"normal" case is when the copying task gets EOF from the upstream pipe -- there is no
more data to copy. In this case, the copying task closes the downstream pipe, if
necessary, and exits.
Somewhat less common is when the downstream consumer decides to stop reading early, while the upstream producer is still sending data to the copy task. (E.g., perhaps the consumer was searching its incoming stream for some value, and it found that value, so there's no need to search further.) In this case, the consumer closes its pipe to indicate it's done reading values. When the copy task discovers that its downstream pipe is closed, it propagates the close to the upstream producer by closing its pipe and stops processing.
val fold' : ?consumer:Consumer.t ‑> ?max_queue_length:int ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a Core_kernel.Queue.t ‑> 'accum Async_kernel.Deferred.t) ‑> 'accum Async_kernel.Deferred.tfold' reader ~init ~f reads a batch of elements from reader, supplies them to f,
waits for f to finish, and then repeats. fold' finishes when the call to f on
the final batch of elements from reader finishes.
val fold : ?consumer:Consumer.t ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a ‑> 'accum Async_kernel.Deferred.t) ‑> 'accum Async_kernel.Deferred.tfold reader ~init ~f folds over the elements of reader, consuming them as they
come in. fold finishes when the final call to f returns.
val fold_without_pushback : ?consumer:Consumer.t ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a ‑> 'accum) ‑> 'accum Async_kernel.Deferred.tval iter' : ?consumer:Consumer.t ‑> ?continue_on_error:bool ‑> ?max_queue_length:int ‑> 'a Reader.t ‑> f:('a Core_kernel.Queue.t ‑> unit Async_kernel.Deferred.t) ‑> unit Async_kernel.Deferred.titer' reader ~f repeatedly applies f to batches of elements of reader, waiting
for each call to f to finish before continuing. The deferred returned by iter'
becomes determined when the call to f on the final batch of elements finishes.
~continue_on_error:true causes the iteration to continue even if f raises.
~consumer is used to extend the meaning of values being flushed (see the Consumer
module above).
val iter : ?consumer:Consumer.t ‑> ?continue_on_error:bool ‑> 'a Reader.t ‑> f:('a ‑> unit Async_kernel.Deferred.t) ‑> unit Async_kernel.Deferred.titer t f is a specialization of iter' that applies the f to each element in the
batch, waiting for one call to f to finish before making the next call to f.
val iter_without_pushback : ?consumer:Consumer.t ‑> ?continue_on_error:bool ‑> ?max_iterations_per_job:int ‑> 'a Reader.t ‑> f:('a ‑> unit) ‑> unit Async_kernel.Deferred.titer_without_pushback t ~f applies f to each element in t, without giving f a
chance to pushback on the iteration continuing. If f raises on some element of t,
iter_without_pushback will not consume any further elements.
iter_without_pushback will not make more than max_iterations_per_job calls to f
in a single Async_job; this can be used to increase Async-scheduling fairness.
val transfer' : ?max_queue_length:int ‑> 'a Reader.t ‑> 'b Writer.t ‑> f:('a Core_kernel.Queue.t ‑> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) ‑> unit Async_kernel.Deferred.ttransfer' input output ~f repeatedly reads a batch of elements from input, applies
f to the batch, writes the result as a batch to output, and then waits on
pushback in output before continuing. transfer' finishes if input is closed
or output is closed. If output is closed, then transfer' closes input.
val transfer : 'a Reader.t ‑> 'b Writer.t ‑> f:('a ‑> 'b) ‑> unit Async_kernel.Deferred.ttransfer is like transfer', except that it processes one element at a time.
val transfer_id : ?max_queue_length:int ‑> 'a Reader.t ‑> 'a Writer.t ‑> unit Async_kernel.Deferred.ttransfer_id is a specialization of transfer' with f = Fn.id.
val map' : ?max_queue_length:int ‑> 'a Reader.t ‑> f:('a Core_kernel.Queue.t ‑> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) ‑> 'b Reader.tmap' input ~f returns a reader, output, and repeatedly applies f to batches of
elements from input, with the results appearing in output. If values are not
being consumed from output, map' will pushback and stop consuming values from
input. If output is closed, then map' will close input.
val folding_map : ?max_queue_length:int ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a ‑> 'accum * 'b) ‑> 'b Reader.tfolding_map is a version of map that threads an accumulator through calls to f.
val fold_map : ?max_queue_length:int ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a ‑> 'accum * 'b) ‑> 'b Reader.tval filter_map' : ?max_queue_length:int ‑> 'a Reader.t ‑> f:('a ‑> 'b option Async_kernel.Deferred.t) ‑> 'b Reader.tfilter_map' input ~f returns a reader, output, and repeatedly applies f to
elements from input, with the results that aren't None appearing in output. If
values are not being consumed from output, filter_map' will pushback and stop
consuming values from input. If output is closed, then filter_map' will close
input.
val folding_filter_map : ?max_queue_length:int ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a ‑> 'accum * 'b option) ‑> 'b Reader.tfolding_filter_map is a version filter_map that threads an accumulator through
calls to f.
val fold_filter_map : ?max_queue_length:int ‑> 'a Reader.t ‑> init:'accum ‑> f:('accum ‑> 'a ‑> 'accum * 'b option) ‑> 'b Reader.tfilter input ~f returns a reader, output, and copies to output each element from
input that satisfies the predicate f. If output is closed, then filter closes
input.
interleave inputs returns a reader, output, and, for each input, transfers batches
of values from that input to output, using transfer_id. Each input is transferred
to output independently. So, batches of values from different inputs can be in
flight to output simultaneously, but at most one batch at a time from any particular
input. The operation is complete when either all the inputs produce EOF, or when
output is closed by the downstream consumer (in which case interleave closes all
the inputs).
merge inputs ~cmp returns a reader, output, that merges all the inputs. Assuming
that for each input, values are sorted according to the comparison function cmp,
values for each input will be transfered to output and the values returned by
output will be sorted according to cmp.
concat inputs return a reader, output, with the values from each pipe in inputs
in sequence. concat closes output once it reaches EOF on the final input.
If output is closed, then concat closes all its inputs.
val to_stream_deprecated : 'a Reader.t ‑> 'a Async_kernel__.Async_stream.tto_stream_deprecated reader returns a stream that reads everything from the pipe.
This function is deprecated because one should change the code that is consuming
a stream to instead consume from a pipe reader.
val of_stream_deprecated : 'a Async_kernel__.Async_stream.t ‑> 'a Reader.tof_stream_deprecated reader returns a pipe that has one element for every element on
the stream. This function is deprecated because one should change the code that is
producing a stream to instead produce a pipe reader.
val drain : 'a Reader.t ‑> unit Async_kernel.Deferred.tdrain reader repeatedly reads values from reader and throws them away.
drain_and_count is like drain, except it also counts the number of values it
has read.
val drain_and_count : 'a Reader.t ‑> int Async_kernel.Deferred.tval to_list : 'a Reader.t ‑> 'a list Async_kernel.Deferred.tto_list input reads everything from input; on EOF, it produces the accumulated
list of these values.
val size_budget : (_, _) t ‑> intEvery pipe has a "size budget", which governs the pushback that is used to discourage writers from enqueueing arbitrarily large amounts of data. As long as the length of the pipe exceeds the size budget, writers will not be notified to do further writing. Whenever the length is less than or equal to the size budget, writers will be notified to continue.
Every pipe's initial size budget is zero.
val set_size_budget : (_, _) t ‑> int ‑> unitset_size_budget t i changes the size budget of t to i. Any nonnegative value is
allowed.
val show_debug_messages : bool Core_kernel.refshow_debug_messages, if true, will cause a message to be printed at the start of
each operation, showing the pipe and other arguments.
val check_invariant : bool Core_kernel.refcheck_invariant, if true, will cause pipes' invariants to be checked at the start of
each operation.