Module Async_kernel__Pipe

A buffered FIFO communication channel.

A pipe has a "writer" end and a "reader" end. The intent is that a writer feeds values into the pipe and then waits until it is notified that it should put more data in (referred to as "pushback").

Each pipe contains a buffer that is a queue of values that have been written to the pipe but not yet read from the pipe. The length of the queue is not bounded; whenever the pipe is written to, values are immediately enqueued. However, writers are supposed to respect pushback from readers, either via the unit Deferred.t returned by write calls or by explicitly calling pushback.

If a pipe is empty, then readers queue up, waiting for values to be written. As soon as values are written, if a reader is available to consume them, the values will be handed to the reader.

One can use downstream_flushed to get notified by a pipe when all prior writes have been consumed by a reader.

There are distinct Reader and Writer modules and types, but all of the operations on readers and writers are available directly from the Pipe module.

For debugging your pipe usage you can use show_debug_messages, and also use set_info to attach some data to a pipe for identification purposes.

type ('a, 'phantom) t
val sexp_of_t : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('a'phantom) t -> Ppx_sexp_conv_lib.Sexp.t
type ('a, 'phantom) pipe = ('a'phantom) t
val sexp_of_pipe : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('a'phantom) pipe -> Ppx_sexp_conv_lib.Sexp.t

Reader and Writer modules

module Writer : sig ... end
module Reader : sig ... end

Creation

val create_reader : close_on_exception:bool -> ('a Writer.t -> unit Async_kernel.Deferred.t) -> 'a Reader.t

create_reader ~close_on_exception f creates a new pipe, applies f to its writer end, and returns its reader end. create_reader closes the writer end when the result of f becomes determined. If f raises, then the exception is raised to the caller of create_reader. Whether or not create_reader closes the writer end upon f raising is determined by close_on_exception.

Choosing ~close_on_exception:false is recommended, because normally closing the write end of a pipe is taken to mean that the writer completed successfully. With close_on_exception:true, the caller will both see the pipe closed and an exception will be raised to the monitor in effect when create_reader was called. There is a race between those two actions, which can easily lead to confusion or bugs.

val create_writer : ('a Reader.t -> unit Async_kernel.Deferred.t) -> 'a Writer.t

create_writer is symmetric with create_reader. It creates a new pipe, applies f to its reader end, and returns its writer end. create_writer calls close_read when the result of f becomes determined. If f raises, create_writer closes the pipe and raises the exception to the caller of create_writer. create_writer closes on exception, unlike create_reader, because closing closing the read end of a pipe is a signal to the writer that the consumer has failed.

val create : ?⁠info:Core_kernel.Sexp.t -> unit -> 'a Reader.t * 'a Writer.t

create () creates a new pipe. It is preferable to use create_reader or create_writer instead of create, since they provide exception handling and automatic closing of the pipe. info is an arbitrary sexp displayed by sexp_of_t, for debugging purposes; see also set_info.

val empty : unit -> _ Reader.t

empty () returns a closed pipe reader with no contents.

val of_list : 'a list -> 'a Reader.t

of_list l returns a closed pipe reader filled with the contents of l.

val singleton : 'a -> 'a Reader.t

singleton x returns a closed pipe reader filled with the single value x.

val unfold : init:'s -> f:('s -> ('a * 's) option Async_kernel.Deferred.t) -> 'a Reader.t

unfold ~init ~f returns a pipe that it fills with 'as by repeatedly applying f to values of the state type 's. When f returns None, the resulting pipe is closed. unfold respects pushback on the resulting pipe. If f raises, then the pipe is not closed.

For example, to create a pipe of natural numbers:

Pipe.unfold ~init:0 ~f:(fun n -> return (Some (n, n+1))) 
val of_sequence : 'a Core_kernel.Sequence.t -> 'a Reader.t

of_sequence sequence returns a pipe reader that gets filled with the elements of sequence. of_sequence respects pushback on the resulting pipe.

type 'a to_sequence_elt =
| Value of 'a
| Wait_for : _ Async_kernel.Deferred.t -> _ to_sequence_elt

to_sequence reader returns a sequence that can be consumed to extract values from reader. If Wait_for d is returned, the consumer must wait for d to become determined before pulling the next value. Repeatedly asking for the next value without waiting on d will infinite loop.

val to_sequence : 'a Reader.t -> 'a to_sequence_elt Core_kernel.Sequence.t

Closing

val close : _ Writer.t -> unit

close t closes the write end of the pipe:

  • Future write attempts will fail, raising an exception.
  • If, at the time of the close, there are reads blocked waiting for data, these reads will unblock, producing `Eof.
  • Future read attempts will drain the data that was in the pipe at the time of the close, until the pipe's buffer has been exhausted; subsequent reads will immediately get `Eof.

Thus, after a pipe has been closed, reads never block.

close is idempotent.

val close_read : _ Reader.t -> unit

close_read t closes both the read and write ends of the pipe. It does everything close does, and in addition:

  • all pending flushes become determined with `Reader_closed.
  • the pipe buffer is cleared.
  • all subsequent reads will get `Eof.
val is_closed : (__) t -> bool

is_closed t returns true iff close t or close_read t has been called.

val closed : (__) t -> unit Async_kernel.Deferred.t

closed t returns a deferred that becomes determined when close t or close_read t is called.

Flushing

module Flushed_result : sig ... end
val upstream_flushed : (__) t -> Flushed_result.t Async_kernel.Deferred.t
val downstream_flushed : (__) t -> Flushed_result.t Async_kernel.Deferred.t
module Consumer : sig ... end
val add_consumer : _ Reader.t -> downstream_flushed:(unit -> Flushed_result.t Async_kernel.Deferred.t) -> Consumer.t

add_consumer reader ~downstream_flushed creates a new consumer of reader, and causes future calls to flushed_downstream reader to take this consumer into account. Thereafter, Pipe.flushed_downstream reader will first ensure that values previously written to reader have been read, then that they have been sent downstream by the consumer that read them, and finally that they have been flushed downstream.

One should only supply the resulting consumer to read operations on reader. Using a consumer created from one reader with another reader will raise an exception.

Generic pipe operations

val length : (__) t -> int

length t returns the number of elements currently queued in t.

val is_empty : (__) t -> bool

is_empty t is true iff there are no values in the pipe.

Writing

val pushback : 'a Writer.t -> unit Async_kernel.Deferred.t

pushback writer becomes determined when either writer has been closed or the pipe can accept a new write.

val write : 'a Writer.t -> 'a -> unit Async_kernel.Deferred.t

write writer a enqueues a in writer, returning a pushback deferred, as described above.

transfer_in writer ~from:q transfers the elements from q into writer, leaving q empty, and returning a pushback deferred.

write_without_pushback and transfer_in_without_pushback are alternatives to transfer_in and write that can be used when you don't care about the pushback deferred. They add data to the pipe and return immediately.

The following equivalences hold:

  • write t a = write_without_pushback t a; pushback t
  • transfer_in t ~from = transfer_in_without_pushback t ~from; pushback t

If is_closed writer, then all of these functions raise.

val write_without_pushback : 'a Writer.t -> 'a -> unit
val transfer_in : 'a Writer.t -> from:'a Core_kernel.Queue.t -> unit Async_kernel.Deferred.t
val transfer_in_without_pushback : 'a Writer.t -> from:'a Core_kernel.Queue.t -> unit
val write_when_ready : 'a Writer.t -> f:(('a -> unit) -> 'b) -> [ `Closed | `Ok of 'b ] Async_kernel.Deferred.t

write_when_ready writer ~f waits until there is space available in the pipe, and then calls f write, where write can be used by f to write a single value into the pipe at a time. write_when_ready guarantees that the pipe is open when it calls f, and hence that the writes will succeed, unless f itself closes the pipe.

val write_if_open : 'a Writer.t -> 'a -> unit Async_kernel.Deferred.t

write_if_open w e is equivalent to:

let x = e in
if not (is_closed w) then (write w x) else (return ()) 

Note the difference in allocation and potential side effects when w is closed and e is a complex expression.

write_without_pushback_if_open is the same as write_if_open, except it calls write_without_pushback instead of write.

val write_without_pushback_if_open : 'a Writer.t -> 'a -> unit

Reading

val read' : ?⁠consumer:Consumer.t -> ?⁠max_queue_length:int -> 'a Reader.t -> [ `Eof | `Ok of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.t

read' pipe reads values available in the pipe, as soon as any value becomes available. The resulting queue will satisfy 0 < Queue.length q <= max_queue_length. read' raises if max_queue_length <= 0. The consumer is used to extend the meaning of values being flushed (see the Consumer module above).

val read : ?⁠consumer:Consumer.t -> 'a Reader.t -> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.t

read pipe reads a single value from the pipe. The consumer is used to extend the meaning of values being flushed (see the Consumer module above).

val read_exactly : ?⁠consumer:Consumer.t -> 'a Reader.t -> num_values:int -> [ `Eof | `Fewer of 'a Core_kernel.Queue.t | `Exactly of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.t

read_exactly r ~num_values reads exactly num_values items, unless EOF is encountered. read_exactly performs a sequence of read_at_most operations, so there is no guarantee that the queue of values it returns comprise a contiguous segment of the written stream of values -- other readers might pick off elements in-between read_exactly's atomic reads. read_exactly raises if num_values <= 0. The consumer is used to extend the meaning of values being flushed (see the Consumer module above).

val read_now' : ?⁠consumer:Consumer.t -> ?⁠max_queue_length:int -> 'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a Core_kernel.Queue.t ]

read_now' reader reads values from reader that are immediately available. If reader is closed, read_now' returns `Eof. If reader is empty, read_now' returns `Nothing_available. Otherwise, `Ok q is returned, and the resulting queue will satisfy 0 < Q.length q <= max_queue_length. The consumer is used to extend the meaning of values being flushed (see the Consumer module above).

val read_now : ?⁠consumer:Consumer.t -> 'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a ]

read_now is like read_now', except that it reads a single value rather than everything that is available.

val peek : 'a Reader.t -> 'a option
val clear : 'a Reader.t -> unit

clear reader consumes all of the values currently in reader, and all blocked flushes become determined with `Ok.

val read_all : 'a Reader.t -> 'a Core_kernel.Queue.t Async_kernel.Deferred.t

read_all reader reads all the values from the pipe until it is closed. An alternative name might be Reader.to_queue.

val values_available : _ Reader.t -> [ `Eof | `Ok ] Async_kernel.Deferred.t

values_available reader returns a deferred that becomes determined when there are values in the pipe. If there are multiple readers (a rare situation), there is no guarantee that some other reader hasn't become active because of ordinary Async scheduling and removed some or all of the values between the time the result of values_available becomes determined and the time something waiting upon that result runs.

values_available is useful when one wants to choose on values being available in a pipe, so that one can be sure and not remove values and drop them on the floor.

values_available is roughly equivalent to read' ~max_queue_length:0.

val read_choice : 'a Reader.t -> [ `Eof | `Ok of 'a | `Nothing_available ] Async_kernel.Deferred.Choice.t

read_choice reader is:

choice
  (values_available reader)
  (fun (_ : [ `Ok | `Eof ]) -> read_now reader) 

read_choice consumes a value from reader iff the choice is taken. read_choice exists to discourage the broken idiom:

choice (read reader) (fun ...) 

which is broken because it reads from reader even if the choice isn't taken. `Nothing_available can only be returned if there is a race condition with one or more other consumers.

read_choice_single_consumer_exn reader [%here] is like read_choice reader, but it raises in the case of `Nothing_available. It is intended to be used when reader has no other consumers.

val read_choice_single_consumer_exn : 'a Reader.t -> Core_kernel.Source_code_position.t -> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.Choice.t

Sequence functions

module Flushed : sig ... end
val fold' : ?⁠flushed:Flushed.t -> ?⁠max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a Core_kernel.Queue.t -> 'accum Async_kernel.Deferred.t) -> 'accum Async_kernel.Deferred.t

fold' reader ~init ~f reads a batch of elements from reader, supplies them to f, waits for f to finish, and then repeats. fold' finishes when the call to f on the final batch of elements from reader finishes.

val fold : ?⁠flushed:Flushed.t -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum Async_kernel.Deferred.t) -> 'accum Async_kernel.Deferred.t
val fold_without_pushback : ?⁠consumer:Consumer.t -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum) -> 'accum Async_kernel.Deferred.t
val iter' : ?⁠continue_on_error:bool -> ?⁠flushed:Flushed.t -> ?⁠max_queue_length:int -> 'a Reader.t -> f:('a Core_kernel.Queue.t -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t

iter' reader ~f repeatedly applies f to batches of elements of reader, waiting for each call to f to finish before continuing. The deferred returned by iter' becomes determined when the call to f on the final batch of elements finishes. ~continue_on_error:true causes the iteration to continue even if f raises.

~flushed:When_value_processed means values in batch b are flushed only after f b is filled.

val iter : ?⁠continue_on_error:bool -> ?⁠flushed:Flushed.t -> 'a Reader.t -> f:('a -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t

iter t f is a specialization of iter' that applies the f to each element in the batch, waiting for one call to f to finish before making the next call to f.

val iter_without_pushback : ?⁠consumer:Consumer.t -> ?⁠continue_on_error:bool -> ?⁠max_iterations_per_job:int -> 'a Reader.t -> f:('a -> unit) -> unit Async_kernel.Deferred.t

iter_without_pushback t ~f applies f to each element in t, without giving f a chance to pushback on the iteration continuing. If f raises on some element of t, iter_without_pushback will not consume any further elements. iter_without_pushback will not make more than max_iterations_per_job calls to f in a single Async_job; this can be used to increase Async-scheduling fairness.

val transfer' : ?⁠max_queue_length:int -> 'a Reader.t -> 'b Writer.t -> f:('a Core_kernel.Queue.t -> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t

transfer' input output ~f repeatedly reads a batch of elements from input, applies f to the batch, writes the result as a batch to output, and then waits on pushback in output before continuing. transfer' finishes if input is closed or output is closed. If output is closed, then transfer' closes input. Use ~max_queue_length:1 to cause elements to appear on the output pipe as soon as they are processed, without having to wait for the entire queue.

val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Async_kernel.Deferred.t

transfer is like transfer', except that it processes one element at a time.

val transfer_id : ?⁠max_queue_length:int -> 'a Reader.t -> 'a Writer.t -> unit Async_kernel.Deferred.t

transfer_id is a specialization of transfer' with f = Fn.id.

val map' : ?⁠max_queue_length:int -> 'a Reader.t -> f:('a Core_kernel.Queue.t -> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) -> 'b Reader.t

map' input ~f returns a reader, output, and repeatedly applies f to batches of elements from input, with the results appearing in output. If values are not being consumed from output, map' will pushback and stop consuming values from input. If output is closed, then map' will close input. Use ~max_queue_length:1 to cause elements to appear on the output pipe as soon as they are processed, without having to wait for the entire queue.

val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t

map is like map', except that it processes one element at a time.

val folding_map : ?⁠max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b) -> 'b Reader.t

folding_map is a version of map that threads an accumulator through calls to f.

val filter_map' : ?⁠max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option Async_kernel.Deferred.t) -> 'b Reader.t

filter_map' input ~f returns a reader, output, and repeatedly applies f to elements from input, with the results that aren't None appearing in output. If values are not being consumed from output, filter_map' will pushback and stop consuming values from input. If output is closed, then filter_map' will close input. filter_map' processes elements in batches as per max_queue_length; in a single batch, all outputs will propagate to the result only when all inputs have been processed.

val filter_map : ?⁠max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.t

filter_map is a specialized version of filter_map'.

val folding_filter_map' : ?⁠max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> ('accum * 'b option) Async_kernel.Deferred.t) -> 'b Reader.t

folding_filter_map' is a version of filter_map' that threads an accumulator through calls to f. Like filter_map', folding_filter_map' processes elements in batches as per max_queue_length; in a single batch, all outputs will propagate to the result only when all inputs have been processed.

val folding_filter_map : ?⁠max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b option) -> 'b Reader.t

folding_filter_map is a specialized version of folding_filter_map'.

val filter : 'a Reader.t -> f:('a -> bool) -> 'a Reader.t

filter input ~f returns a reader, output, and copies to output each element from input that satisfies the predicate f. If output is closed, then filter closes input.

val interleave : 'a Reader.t list -> 'a Reader.t

interleave inputs returns a reader, output, and, for each input, transfers batches of values from that input to output, using transfer_id. Each input is transferred to output independently. So, batches of values from different inputs can be in flight to output simultaneously, but at most one batch at a time from any particular input. The operation is complete when either all the inputs produce EOF, or when output is closed by the downstream consumer (in which case interleave closes all the inputs).

val interleave_pipe : 'a Reader.t Reader.t -> 'a Reader.t
val merge : 'a Reader.t list -> compare:('a -> 'a -> int) -> 'a Reader.t

merge inputs ~compare returns a reader, output, that merges all the inputs. Assuming that for each input, values are sorted according to the comparison function compare, values for each input will be transfered to output and the values returned by output will be sorted according to compare.

val concat : 'a Reader.t list -> 'a Reader.t

concat inputs return a reader, output, with the values from each pipe in inputs in sequence. concat closes output once it reaches EOF on the final input. If output is closed, then concat closes all its inputs.

val concat_pipe : 'a Reader.t Reader.t -> 'a Reader.t

concat_pipe is like concat, but it takes a pipe of inputs instead of a list, and closes the input pipe when the output pipe is closed.

val fork : 'a Reader.t -> pushback_uses:[ `Both_consumers | `Fast_consumer_only ] -> 'a Reader.t * 'a Reader.t

fork input returns a pair of readers and transfers each of the values in input into both of the returned readers. It closes input early if both of the readers are closed early.

If pushback_uses = `Both_consumers, then fork waits for pushback on both readers when writing. If one of the readers is not read from or is slow to be read from, it may block the other from receiving data. Beware of possible deadlocks in downstream code due to blocking on reading too many elements from one before reading the other.

If pushback_uses = `Fast_consumer_only, then fork waits for pushback only on the faster of the two readers when writing. In this case the slow reader cannot block the faster one, but fork could be forced to buffer arbitrarily many elements. Beware of unbounded resource usage in downstream code where one reader might fall behind.

Note that upstream_flushed will not work with the pipes returned by fork.

val to_stream_deprecated : 'a Reader.t -> 'a Async_kernel__.Async_stream.t

to_stream_deprecated reader returns a stream that reads everything from the pipe. This function is deprecated because one should change the code that is consuming a stream to instead consume from a pipe reader.

val of_stream_deprecated : 'a Async_kernel__.Async_stream.t -> 'a Reader.t

of_stream_deprecated reader returns a pipe that has one element for every element on the stream. This function is deprecated because one should change the code that is producing a stream to instead produce a pipe reader.

val drain : 'a Reader.t -> unit Async_kernel.Deferred.t

drain reader repeatedly reads values from reader and throws them away.

drain_and_count is like drain, except it also counts the number of values it has read.

val drain_and_count : 'a Reader.t -> int Async_kernel.Deferred.t
val to_list : 'a Reader.t -> 'a list Async_kernel.Deferred.t

to_list input reads everything from input; on EOF, it produces the accumulated list of these values.

Miscellaneous

val hash : (__) t -> int

hash is a hash function based on the internal id of the pipe.

val equal : ('a'b) t -> ('a'b) t -> bool

equal on pipes is physical equality.

val compare : (__) t -> (__) t -> int

compare on pipes is based on the internal id of the pipe.

Size budget

val size_budget : (__) t -> int

Every pipe has a "size budget", which governs the pushback that is used to discourage writers from enqueueing arbitrarily large amounts of data. As long as the length of the pipe exceeds the size budget, writers will not be notified to do further writing. Whenever the length is less than or equal to the size budget, writers will be notified to continue.

Every pipe's initial size budget is zero.

val set_size_budget : (__) t -> int -> unit

set_size_budget t i changes the size budget of t to i. Any nonnegative value is allowed.

Debugging

val show_debug_messages : bool Core_kernel.ref

show_debug_messages, if true, will cause a message to be printed at the start of each operation, showing the pipe and other arguments.

val check_invariant : bool Core_kernel.ref

check_invariant, if true, will cause pipes' invariants to be checked at the start of each operation.

val set_info : (__) t -> Core_kernel.Sexp.t -> unit

set_info updates t's info field, which is displayed by sexp_of_t, and thus in debugging messages.