Module Pipe

module Pipe: Pipe

type ('a, 'phantom) t 
val sexp_of_t : ('a -> Sexplib.Sexp.t) ->
('phantom -> Sexplib.Sexp.t) -> ('a, 'phantom) t -> Sexplib.Sexp.t
type ('a, 'phantom) pipe = ('a, 'phantom) t 
val sexp_of_pipe : ('a -> Sexplib.Sexp.t) ->
('phantom -> Sexplib.Sexp.t) -> ('a, 'phantom) pipe -> Sexplib.Sexp.t

Reader and Writer modules



These provide reader- and writer-specific types for the base pipe type.
module Writer: sig .. end
module Reader: sig .. end

Creation


val create : unit -> 'a Reader.t * 'a Writer.t
create () creates a new pipe.
val of_list : 'a list -> 'a Reader.t
of_list l returns a closed pipe reader filled with the contents of l.

Closing


val close : 'a Writer.t -> unit
close t closes the write end of the pipe:

Thus, after a pipe has been closed, reads never block.

Close is idempotent.

close_read t closes both the read and write ends of the pipe. It does everything close does, and in addition:


val close_read : 'a Reader.t -> unit
val is_closed : ('a, 'b) t -> bool
is_closed t returns true iff close t or close_read t has been called.
val closed : ('a, 'b) t -> unit Deferred.t
closed t returns a deferred that becomes determined when close t or close_read t is called.

Generic pipe operations



These operations apply to all values of type (_, _) t, that is, both readers and writers.
val flushed : ('a, 'b) t -> [ `Ok | `Reader_closed ] Deferred.t
flushed t becomes determined when all values written prior to the call to flushed have been removed from the pipe and handed to a reader, or if the reader end of the pipe is closed.
val length : ('a, 'b) t -> int
length t returns the number of elements currently queued in t

Writing



The write operations return a deferred value that is determined when either (1) it is OK to write again to the pipe or (2) the pipe has been closed. This deferred is the data-producer's interface to the pipe pushback mechanism: it tells the producer when it should proceed after doing a write -- either to produce and write more data to the pipe, or to abandon production entirely. The pushback mechansim is just advisory: a producer task can, but typically should not, dump arbitrary amounts of data into a pipe even if there is no consumer draining the pipe.

Producers that write a sequence of values to a pipe should be aware that the consumers who read from the pipe can close the pipe early -- that is, before the producer has finished doing all of its writes. If this happens, further writes will raise an exception. To avoid these errors, all writes must be atomically guarded by is_closed tests. Thus, a typical writer loop should look like this:

    fun countup hi w = (* Send the ints in range [0,hi) to writer W. *)
      let rec loop i =
        if i < hi and not (is_closed w) then (* Guard write w/closed test. *)
          write i w >>>            (* Do the write then block until datum     *)
          fun () -> loop (i+1)     (*   fits or the pipe is closed.           *)
        else close w (* No harm done if reader has already closed the pipe.*)
      in
      loop 0
    

If the pipe's consumer stops reading early and closes the pipe, countup won't error out trying to write further values down the pipe: it will immediately wake up and exit.

val pushback : 'a Writer.t -> unit Deferred.t
pushback writer becomes determined when either writer has been closed or the pipe is empty.
val write' : 'a Writer.t -> 'a Core.Std.Queue.t -> unit Deferred.t
write' writer q transfers the elements from q into the pipe, leaving q empty. write' returns a pushback deferred, as described above. Writing to a closed pipe raises.

write writer v is equivalent to write' writer (Queue.singleton v).

val write : 'a Writer.t -> 'a -> unit Deferred.t
val with_write : 'a Writer.t ->
f:(('a -> unit) -> 'b) -> [ `Closed | `Ok of 'b ] Deferred.t
with_write writer ~f waits until there is space available in the pipe, and then calls f write, where write can be used by f to write a single value into the pipe at a time. with_write guarantees that the pipe is open when it calls f, and hence that the writes will succeed, unless f itself closes the pipe.

Reading



With two special exceptions, all read procedures have a best-effort/forward-progress semantics:

The best-effort semantics allows you to program in a style that processes data in big slabs, yet also moves data through your processing in as timely a way as possible.

The forward-progress semantics means that every call produces some data, so you can process an n-element input with at most n reads; you cannot burn an unbounded number of cycles "spinning" doing an unbounded number of empty-result "polling" calls (which, in a non-preemptive system like Async could lock up the process).

The two exceptions to best-effort/forward-progress semantics are read_now, which polls for data, thus abandoning the forward-progress guarantee, and read_exactly, which loops until it has read the entire amount requested (or encountered EOF), thus abandoning the best-effort guarantee of timeliness.

val read' : 'a Reader.t -> [ `Eof | `Ok of 'a Core.Std.Queue.t ] Deferred.t
read' pipe reads all of the values available in the pipe, as soon as any value becomes available. The resulting queue will satisfy Q.length q > 0.
val read : 'a Reader.t -> [ `Eof | `Ok of 'a ] Deferred.t
read pipe reads a single value from the pipe.
val read_at_most : 'a Reader.t ->
num_values:int -> [ `Eof | `Ok of 'a Core.Std.Queue.t ] Deferred.t
read_at_most r ~num_values reads up to num_values values from the pipe's currently available data, blocking if the pipe is empty. The resulting queue will satisfy 0 < Queue.length q <= num_values. read_at_most raises if num_values <= 0.
val read_exactly : 'a Reader.t ->
num_values:int ->
[ `Eof | `Exactly of 'a Core.Std.Queue.t | `Fewer of 'a Core.Std.Queue.t ]
Deferred.t
read_exactly r ~num_values reads exactly num_values items, unless EOF is encountered. read_exactly performs a sequence of read_at_most operations, so there is no guarantee that the queue of values it returns comprise a contiguous segment of the written stream of values -- other readers might pick off elements in-between read_exactly's atomic reads. read_exactly raises if num_values <= 0.
val read_now : 'a Reader.t ->
[ `Eof | `Nothing_available | `Ok of 'a Core.Std.Queue.t ]
read_now reader reads all of the values from reader that are immediately available. The resulting queue will satisfy Q.length q > 0. If reader is closed, read_now returns `Eof. If reader is empty, read_now returns `Nothing_available. read_now has the danger of permitting the computation to "spin" doing empty reads; it is only useful in exotic circumstances.
val clear : 'a Reader.t -> unit
clear reader consumes all of the values currently in reader, and all blocked flushes become determined with `Ok.
val read_all : 'a Reader.t -> 'a Core.Std.Queue.t Deferred.t
read_all reader reads all the values from the pipe until it is closed. An alternative name might be Reader.to_queue.
val values_available : 'a Reader.t -> [ `Eof | `Ok ] Deferred.t
values_available reader returns a deferred that becomes determined when there are values in the pipe. If there are multiple readers (a rare situation), there is no guarantee, that some other reader hasn't become active because of ordinary async scheduling and removed some or all of the values between the time the result of values_available becomes determined and the time something waiting upon that result runs.

values_available is useful when one wants to choose on values being available in a pipe, so that one can be sure and not remove values and drop them on the floor.

values_available is roughly equivalent to read_at_most ~num_values:0.


Sequence functions


val fold' : 'a Reader.t ->
init:'b -> f:('b -> 'a Core.Std.Queue.t -> 'b Deferred.t) -> 'b Deferred.t
fold' reader ~init ~f reads a batch of elements from reader, supplies them to f, waits for f to finish, and then repeats. fold' finishes when the call to f on the final batch of elements from reader finishes.

fold reader ~init ~f folds over the elements of reader, consuming them as they come in. fold finishes when the final call to f returns.

val fold : 'a Reader.t -> init:'b -> f:('b -> 'a -> 'b) -> 'b Deferred.t
val iter' : 'a Reader.t ->
f:('a Core.Std.Queue.t -> unit Deferred.t) -> unit Deferred.t
iter' reader ~f repeatedly applies f to batches of elements of reader, waiting for each call to f to finish before continuing. The deferred returned by iter' becomes determined when the call to f on the final batch of elements finishes.

iter is a specialization of iter' that uses Deferred.Queue.iter ~f

iter_without_pushback is a specialized version that applies f to each element that arrives on the pipe, without giving f a chance to pushback on the iteration continuing.

val iter : 'a Reader.t -> f:('a -> unit Deferred.t) -> unit Deferred.t
val iter_without_pushback : 'a Reader.t -> f:('a -> unit) -> unit Deferred.t
val transfer' : 'a Reader.t ->
'b Writer.t ->
f:('a Core.Std.Queue.t -> 'b Core.Std.Queue.t Deferred.t) -> unit Deferred.t
transfer' input output ~f ?stop repeatedly reads a batch of elements from input, applies f to the batch, writes the result as a batch to output, and then waits on pushback in output before continuing. transfer' finishes if input is closed, or output is closed, or stop is determined. If output is closed, then transfer' closes input.

transfer is a specialization of transfer' that uses Queue.map ~f.

transfer_id is a specialization of transfer' wifh f = Fn.id.

val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Deferred.t
val transfer_id : 'a Reader.t -> 'a Writer.t -> unit Deferred.t
val map' : 'a Reader.t ->
f:('a Core.Std.Queue.t -> 'b Core.Std.Queue.t Deferred.t) -> 'b Reader.t
map' input ~f returns a reader, output, and repeatedly applies f to batches of elements from input, with the results appearing in output. If values are not being consumed from output, map' will pushback and stop consuming values from input.

If output is closed, then map' will close input.

map is a specialization of map' that uses Queue.map ~f.

val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t
val filter_map' : 'a Reader.t -> f:('a -> 'b option Deferred.t) -> 'b Reader.t
filter_map' input ~f returns a reader, output, and repeatedly applies f to elements from input, with the results that aren't None appearing in output. If values are not being consumed from output, filter_map' will pushback and stop consuming values from input.

If output is closed, then filter_map' will close input.

filter_map is a specialized version of filter_map'.

val filter_map : 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.t
val filter : 'a Reader.t -> f:('a -> bool) -> 'a Reader.t
filter input ~f returns a reader, output, and copies to output each element from input that satisfies the predicate f. If output is closed, then filter closes input.
val interleave : 'a Reader.t list -> 'a Reader.t
interleave inputs returns a reader output, and, for each input, transfers batches of values from that input to output, using transfer_id. Each input is transferred to output independently. So, batches of values from different inputs can be in flight to output simultaneously, but at most one batch at a time from any particular input. The operation is complete when either all the inputs produce EOF, or when output is closed by the downstream consumer (in which case interleave closes all the inputs).
val concat : 'a Reader.t list -> 'a Reader.t
concat inputs return a reader, output, with the values from each pipe in inputs in sequence. concat closes output once it reaches EOF on the final input. If output is closed, then concat closes all its inputs.
val to_stream : 'a Reader.t -> 'a Async_stream.t
to_stream reader returns a stream that reads everything from the pipe.
val of_stream : 'a Async_stream.t -> 'a Reader.t
of_stream reader return a pipe that has one element for every element on the stream
val drain : 'a Reader.t -> unit Deferred.t
drain reader repeatedly reads values from reader and throws them away.

drain_and_count is like drain, except it also counts the number of values it has read.

val drain_and_count : 'a Reader.t -> int Deferred.t
val to_list : 'a Reader.t -> 'a list Deferred.t
to_list input reads everything from input; on EOF, it produces the accumulated list of these values.

Miscellaneous


val hash : ('a, 'b) t -> int
hash a hash function suitable for pipes
val equal : ('a, 'b) t -> ('a, 'b) t -> bool
equal on pipes is physical equality.

Size budget


val size_budget : ('a, 'b) t -> int
Every pipe has a "size budget", which governs the pushback that is used to discourage writers from enqueueing arbitrarily large amounts of data. As long as the length of the pipe exceeds the size budget, writers will not be notified to do further writing. Whenver the length is less than or equal to the size budget, writers will be notified to continue.

Every pipe's initial size budget is zero.

val set_size_budget : ('a, 'b) t -> int -> unit
set_size_budget t i changes the size budget of t to i. Any nonnegative value is allowed.

Debugging


val show_debug_messages : bool Pervasives.ref
show_debug_messages, if true will cause a message to be printed at the start of each operation, showing the pipe and other arguments.
val check_invariant : bool Pervasives.ref
check_invariant, if true, will cause pipes' invariants to be checked at the start of each operation.