Module Async_kernel__.Pipe
val sexp_of_t : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('a, 'phantom) t -> Ppx_sexp_conv_lib.Sexp.t
type ('a, 'phantom) pipe
= ('a, 'phantom) t
val sexp_of_pipe : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('a, 'phantom) pipe -> Ppx_sexp_conv_lib.Sexp.t
Reader and Writer modules
module Writer : sig ... end
module Reader : sig ... end
Creation
val create_reader : close_on_exception:bool -> ('a Writer.t -> unit Async_kernel.Deferred.t) -> 'a Reader.t
create_reader ~close_on_exception f
creates a new pipe, appliesf
to its writer end, and returns its reader end.create_reader
closes the writer end when the result off
becomes determined. Iff
raises, then the exception is raised to the caller ofcreate_reader
. Whether or notcreate_reader
closes the writer end uponf
raising is determined byclose_on_exception
.Choosing
~close_on_exception:false
is recommended, because normally closing the write end of a pipe is taken to mean that the writer completed successfully. Withclose_on_exception:true
, the caller will both see the pipe closed and an exception will be raised to the monitor in effect whencreate_reader
was called. There is a race between those two actions, which can easily lead to confusion or bugs.
val create_writer : ('a Reader.t -> unit Async_kernel.Deferred.t) -> 'a Writer.t
create_writer
is symmetric withcreate_reader
. It creates a new pipe, appliesf
to its reader end, and returns its writer end.create_writer
callsclose_read
when the result off
becomes determined. Iff
raises,create_writer
closes the pipe and raises the exception to the caller ofcreate_writer
.create_writer
closes on exception, unlikecreate_reader
, because closing closing the read end of a pipe is a signal to the writer that the consumer has failed.
val init : ('a Writer.t -> unit Async_kernel.Deferred.t) -> 'a Reader.t
val create : ?info:Core_kernel.Sexp.t -> unit -> 'a Reader.t * 'a Writer.t
create ()
creates a new pipe. It is preferable to usecreate_reader
orcreate_writer
instead ofcreate
, since they provide exception handling and automatic closing of the pipe.info
is an arbitrary sexp displayed bysexp_of_t
, for debugging purposes; see alsoset_info
.
val empty : unit -> _ Reader.t
empty ()
returns a closed pipe reader with no contents.
val of_list : 'a list -> 'a Reader.t
of_list l
returns a closed pipe reader filled with the contents ofl
.
val singleton : 'a -> 'a Reader.t
singleton x
returns a closed pipe reader filled with the single valuex
.
val unfold : init:'s -> f:('s -> ('a * 's) option Async_kernel.Deferred.t) -> 'a Reader.t
unfold ~init ~f
returns a pipe that it fills with'a
s by repeatedly applyingf
to values of the state type's
. Whenf
returnsNone
, the resulting pipe is closed.unfold
respects pushback on the resulting pipe. Iff
raises, then the pipe is not closed.For example, to create a pipe of natural numbers:
Pipe.unfold ~init:0 ~f:(fun n -> return (Some (n, n+1)))
val of_sequence : 'a Core_kernel.Sequence.t -> 'a Reader.t
of_sequence sequence
returns a pipe reader that gets filled with the elements ofsequence
.of_sequence
respects pushback on the resulting pipe.
type 'a to_sequence_elt
=
|
Value of 'a
|
Wait_for : _ Async_kernel.Deferred.t -> _ to_sequence_elt
to_sequence reader
returns a sequence that can be consumed to extract values fromreader
. IfWait_for d
is returned, the consumer must wait ford
to become determined before pulling the next value. Repeatedly asking for the next value without waiting ond
will infinite loop.
val to_sequence : 'a Reader.t -> 'a to_sequence_elt Core_kernel.Sequence.t
Closing
val close : _ Writer.t -> unit
close t
closes the write end of the pipe:- Future write attempts will fail, raising an exception.
- If, at the time of the close, there are reads blocked waiting for data, these reads will unblock, producing
`Eof
.
- Future read attempts will drain the data that was in the pipe at the time of the close, until the pipe's buffer has been exhausted; subsequent reads will immediately get
`Eof
.
Thus, after a pipe has been closed, reads never block.
close
is idempotent.
val close_read : _ Reader.t -> unit
close_read t
closes both the read and write ends of the pipe. It does everythingclose
does, and in addition:- all pending flushes become determined with
`Reader_closed
. - the pipe buffer is cleared.
- all subsequent reads will get
`Eof
.
- all pending flushes become determined with
val is_closed : (_, _) t -> bool
is_closed t
returnstrue
iffclose t
orclose_read t
has been called.
val closed : (_, _) t -> unit Async_kernel.Deferred.t
closed t
returns a deferred that becomes determined whenclose t
orclose_read t
is called.
Flushing
module Flushed_result : sig ... end
val upstream_flushed : (_, _) t -> Flushed_result.t Async_kernel.Deferred.t
val downstream_flushed : (_, _) t -> Flushed_result.t Async_kernel.Deferred.t
module Consumer : sig ... end
val add_consumer : _ Reader.t -> downstream_flushed:(unit -> Flushed_result.t Async_kernel.Deferred.t) -> Consumer.t
add_consumer reader ~downstream_flushed
creates a new consumer ofreader
, and causes future calls toflushed_downstream reader
to take this consumer into account. Thereafter,Pipe.flushed_downstream reader
will first ensure that values previously written toreader
have been read, then that they have been sent downstream by the consumer that read them, and finally that they have been flushed downstream.One should only supply the resulting consumer to read operations on
reader
. Using a consumer created from one reader with another reader will raise an exception.
Generic pipe operations
val length : (_, _) t -> int
length t
returns the number of elements currently queued int
.
val is_empty : (_, _) t -> bool
is_empty t
is true iff there are no values in the pipe.
Writing
val pushback : 'a Writer.t -> unit Async_kernel.Deferred.t
pushback writer
becomes determined when eitherwriter
has been closed or the pipe can accept a new write.
val write : 'a Writer.t -> 'a -> unit Async_kernel.Deferred.t
write writer a
enqueuesa
inwriter
, returning a pushback deferred, as described above.transfer_in writer ~from:q
transfers the elements fromq
intowriter
, leavingq
empty, and returning a pushback deferred.write_without_pushback
andtransfer_in_without_pushback
are alternatives totransfer_in
andwrite
that can be used when you don't care about the pushback deferred. They add data to the pipe and return immediately.The following equivalences hold:
write t a = write_without_pushback t a; pushback t
transfer_in t ~from = transfer_in_without_pushback t ~from; pushback t
If
is_closed writer
, then all of these functions raise.
val write_without_pushback : 'a Writer.t -> 'a -> unit
val transfer_in : 'a Writer.t -> from:'a Core_kernel.Queue.t -> unit Async_kernel.Deferred.t
val transfer_in_without_pushback : 'a Writer.t -> from:'a Core_kernel.Queue.t -> unit
val write_when_ready : 'a Writer.t -> f:(('a -> unit) -> 'b) -> [ `Closed | `Ok of 'b ] Async_kernel.Deferred.t
write_when_ready writer ~f
waits until there is space available in the pipe, and then callsf write
, wherewrite
can be used byf
to write a single value into the pipe at a time.write_when_ready
guarantees that the pipe is open when it callsf
, and hence that the writes will succeed, unlessf
itself closes the pipe.
val write_if_open : 'a Writer.t -> 'a -> unit Async_kernel.Deferred.t
write_if_open w e
is equivalent to:let x = e in if not (is_closed w) then (write w x) else (return ())
Note the difference in allocation and potential side effects when
w
is closed ande
is a complex expression.write_without_pushback_if_open
is the same aswrite_if_open
, except it callswrite_without_pushback
instead ofwrite
.
val write_without_pushback_if_open : 'a Writer.t -> 'a -> unit
Reading
val read' : ?consumer:Consumer.t -> ?max_queue_length:int -> 'a Reader.t -> [ `Eof | `Ok of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.t
read' pipe
reads values available in the pipe, as soon as any value becomes available. The resulting queue will satisfy0 < Queue.length q <= max_queue_length
.read'
raises ifmax_queue_length <= 0
. Theconsumer
is used to extend the meaning of values being flushed (see theConsumer
module above).
val read : ?consumer:Consumer.t -> 'a Reader.t -> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.t
read pipe
reads a single value from the pipe. Theconsumer
is used to extend the meaning of values being flushed (see theConsumer
module above).
val read_at_most : ?consumer:Consumer.t -> 'a Reader.t -> num_values:int -> [ `Eof | `Ok of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.t
read_at_most t ~num_values
isread' t ~max_queue_length:num_values
.
val read_exactly : ?consumer:Consumer.t -> 'a Reader.t -> num_values:int -> [ `Eof | `Fewer of 'a Core_kernel.Queue.t | `Exactly of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.t
read_exactly r ~num_values
reads exactlynum_values
items, unless EOF is encountered.read_exactly
performs a sequence ofread_at_most
operations, so there is no guarantee that the queue of values it returns comprise a contiguous segment of the written stream of values -- other readers might pick off elements in-betweenread_exactly
's atomic reads.read_exactly
raises ifnum_values <= 0
. Theconsumer
is used to extend the meaning of values being flushed (see theConsumer
module above).
val read_now' : ?consumer:Consumer.t -> ?max_queue_length:int -> 'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a Core_kernel.Queue.t ]
read_now' reader
reads values fromreader
that are immediately available. Ifreader
is closed,read_now'
returns`Eof
. Ifreader
is empty,read_now'
returns`Nothing_available
. Otherwise,`Ok q
is returned, and the resulting queue will satisfy0 < Q.length q <= max_queue_length
. Theconsumer
is used to extend the meaning of values being flushed (see theConsumer
module above).
val read_now : ?consumer:Consumer.t -> 'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a ]
read_now
is likeread_now'
, except that it reads a single value rather than everything that is available.
val read_now_at_most : ?consumer:Consumer.t -> 'a Reader.t -> num_values:int -> [ `Eof | `Nothing_available | `Ok of 'a Core_kernel.Queue.t ]
read_now_at_most t ~num_values
isread_now' t ~max_queue_length:num_values
val peek : 'a Reader.t -> 'a option
val clear : 'a Reader.t -> unit
clear reader
consumes all of the values currently inreader
, and all blocked flushes become determined with`Ok
.
val read_all : 'a Reader.t -> 'a Core_kernel.Queue.t Async_kernel.Deferred.t
read_all reader
reads all the values from the pipe until it is closed. An alternative name might beReader.to_queue
.
val values_available : _ Reader.t -> [ `Eof | `Ok ] Async_kernel.Deferred.t
values_available reader
returns a deferred that becomes determined when there are values in the pipe. If there are multiple readers (a rare situation), there is no guarantee that some other reader hasn't become active because of ordinary Async scheduling and removed some or all of the values between the time the result ofvalues_available
becomes determined and the time something waitingupon
that result runs.values_available
is useful when one wants tochoose
on values being available in a pipe, so that one can be sure and not remove values and drop them on the floor.values_available
is roughly equivalent toread' ~max_queue_length:0
.
val read_choice : 'a Reader.t -> [ `Eof | `Ok of 'a | `Nothing_available ] Async_kernel.Deferred.choice
read_choice reader
is:choice (values_available reader) (fun (_ : [ `Ok | `Eof ]) -> read_now reader)
read_choice
consumes a value fromreader
iff the choice is taken.read_choice
exists to discourage the broken idiom:choice (read reader) (fun ...)
which is broken because it reads from
reader
even if the choice isn't taken.`Nothing_available
can only be returned if there is a race condition with one or more other consumers.read_choice_single_consumer_exn reader [%here]
is likeread_choice reader
, but it raises in the case of`Nothing_available
. It is intended to be used whenreader
has no other consumers.
val read_choice_single_consumer_exn : 'a Reader.t -> Core_kernel.Source_code_position.t -> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.choice
Sequence functions
module Flushed : sig ... end
val fold' : ?flushed:Flushed.t -> ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a Core_kernel.Queue.t -> 'accum Async_kernel.Deferred.t) -> 'accum Async_kernel.Deferred.t
fold' reader ~init ~f
reads a batch of elements fromreader
, supplies them tof
, waits forf
to finish, and then repeats.fold'
finishes when the call tof
on the final batch of elements fromreader
finishes.
val fold : ?flushed:Flushed.t -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum Async_kernel.Deferred.t) -> 'accum Async_kernel.Deferred.t
val fold_without_pushback : ?consumer:Consumer.t -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum) -> 'accum Async_kernel.Deferred.t
val iter' : ?continue_on_error:bool -> ?flushed:Flushed.t -> ?max_queue_length:int -> 'a Reader.t -> f:('a Core_kernel.Queue.t -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t
iter' reader ~f
repeatedly appliesf
to batches of elements ofreader
, waiting for each call tof
to finish before continuing. The deferred returned byiter'
becomes determined when the call tof
on the final batch of elements finishes.~continue_on_error:true
causes the iteration to continue even iff
raises.~flushed:When_value_processed
means values in batchb
are flushed only afterf b
is filled.
val iter : ?continue_on_error:bool -> ?flushed:Flushed.t -> 'a Reader.t -> f:('a -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t
iter t f
is a specialization ofiter'
that applies thef
to each element in the batch, waiting for one call tof
to finish before making the next call tof
.
val iter_without_pushback : ?consumer:Consumer.t -> ?continue_on_error:bool -> ?max_iterations_per_job:int -> 'a Reader.t -> f:('a -> unit) -> unit Async_kernel.Deferred.t
iter_without_pushback t ~f
appliesf
to each element int
, without givingf
a chance to pushback on the iteration continuing. Iff
raises on some element oft
,iter_without_pushback
will not consume any further elements.iter_without_pushback
will not make more thanmax_iterations_per_job
calls tof
in a single Async_job; this can be used to increase Async-scheduling fairness.
val transfer' : ?max_queue_length:int -> 'a Reader.t -> 'b Writer.t -> f:('a Core_kernel.Queue.t -> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t
transfer' input output ~f
repeatedly reads a batch of elements frominput
, appliesf
to the batch, writes the result as a batch tooutput
, and then waits onpushback
inoutput
before continuing.transfer'
finishes ifinput
is closed oroutput
is closed. Ifoutput
is closed, thentransfer'
closesinput
. Use~max_queue_length:1
to cause elements to appear on the output pipe as soon as they are processed, without having to wait for the entire queue.
val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Async_kernel.Deferred.t
transfer
is liketransfer'
, except that it processes one element at a time.
val transfer_id : ?max_queue_length:int -> 'a Reader.t -> 'a Writer.t -> unit Async_kernel.Deferred.t
transfer_id
is a specialization oftransfer'
withf = Fn.id
.
val map' : ?max_queue_length:int -> 'a Reader.t -> f:('a Core_kernel.Queue.t -> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) -> 'b Reader.t
map' input ~f
returns a reader,output
, and repeatedly appliesf
to batches of elements frominput
, with the results appearing inoutput
. If values are not being consumed fromoutput
,map'
will pushback and stop consuming values frominput
. Ifoutput
is closed, thenmap'
will closeinput
. Use~max_queue_length:1
to cause elements to appear on the output pipe as soon as they are processed, without having to wait for the entire queue.
val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t
map
is likemap'
, except that it processes one element at a time.
val folding_map : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b) -> 'b Reader.t
folding_map
is a version ofmap
that threads an accumulator through calls tof
.
val fold_map : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b) -> 'b Reader.t
val filter_map' : ?max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option Async_kernel.Deferred.t) -> 'b Reader.t
filter_map' input ~f
returns a reader,output
, and repeatedly appliesf
to elements frominput
, with the results that aren'tNone
appearing inoutput
. If values are not being consumed fromoutput
,filter_map'
will pushback and stop consuming values frominput
. Ifoutput
is closed, thenfilter_map'
will closeinput
.filter_map'
processes elements in batches as permax_queue_length
; in a single batch, all outputs will propagate to the result only when all inputs have been processed.
val filter_map : ?max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.t
filter_map
is a specialized version offilter_map'
.
val folding_filter_map : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b option) -> 'b Reader.t
folding_filter_map
is a versionfilter_map
that threads an accumulator through calls tof
.
val fold_filter_map : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b option) -> 'b Reader.t
val filter : 'a Reader.t -> f:('a -> bool) -> 'a Reader.t
filter input ~f
returns a reader,output
, and copies tooutput
each element frominput
that satisfies the predicatef
. Ifoutput
is closed, thenfilter
closesinput
.
val interleave : 'a Reader.t list -> 'a Reader.t
interleave inputs
returns a reader,output
, and, for each input, transfers batches of values from that input tooutput
, usingtransfer_id
. Each input is transferred tooutput
independently. So, batches of values from different inputs can be in flight tooutput
simultaneously, but at most one batch at a time from any particular input. The operation is complete when either all theinputs
produce EOF, or whenoutput
is closed by the downstream consumer (in which caseinterleave
closes all theinputs
).
val interleave_pipe : 'a Reader.t Reader.t -> 'a Reader.t
val merge : 'a Reader.t list -> compare:('a -> 'a -> int) -> 'a Reader.t
merge inputs ~compare
returns a reader,output
, that merges all the inputs. Assuming that for each input, values are sorted according to the comparison functioncompare
, values for each input will be transfered tooutput
and the values returned byoutput
will be sorted according tocompare
.
val concat : 'a Reader.t list -> 'a Reader.t
concat inputs
return a reader,output
, with the values from each pipe ininputs
in sequence.concat
closesoutput
once it reaches EOF on the final input. Ifoutput
is closed, thenconcat
closes all its inputs.
val fork : 'a Reader.t -> pushback_uses:[ `Both_consumers | `Fast_consumer_only ] -> 'a Reader.t * 'a Reader.t
fork input
returns a pair of readers and transfers each of the values ininput
into both of the returned readers. It closesinput
early if both of the readers are closed early.If
pushback_uses = `Both_consumers
, thenfork
waits forpushback
on both readers when writing. If one of the readers is not read from or is slow to be read from, it may block the other from receiving data. Beware of possible deadlocks in downstream code due to blocking on reading too many elements from one before reading the other.If
pushback_uses = `Fast_consumer_only
, thenfork
waits forpushback
only on the faster of the two readers when writing. In this case the slow reader cannot block the faster one, butfork
could be forced to buffer arbitrarily many elements. Beware of unbounded resource usage in downstream code where one reader might fall behind.Note that
upstream_flushed
will not work with the pipes returned byfork
.
val to_stream_deprecated : 'a Reader.t -> 'a Async_kernel__.Async_stream.t
to_stream_deprecated reader
returns a stream that reads everything from the pipe. This function is deprecated because one should change the code that is consuming a stream to instead consume from a pipe reader.
val of_stream_deprecated : 'a Async_kernel__.Async_stream.t -> 'a Reader.t
of_stream_deprecated reader
returns a pipe that has one element for every element on the stream. This function is deprecated because one should change the code that is producing a stream to instead produce a pipe reader.
val drain : 'a Reader.t -> unit Async_kernel.Deferred.t
drain reader
repeatedly reads values fromreader
and throws them away.drain_and_count
is likedrain
, except it also counts the number of values it has read.
val drain_and_count : 'a Reader.t -> int Async_kernel.Deferred.t
val to_list : 'a Reader.t -> 'a list Async_kernel.Deferred.t
to_list input
reads everything frominput
; on EOF, it produces the accumulated list of these values.
Miscellaneous
val hash : (_, _) t -> int
hash
is a hash function based on the internal id of the pipe.
Size budget
val size_budget : (_, _) t -> int
Every pipe has a "size budget", which governs the pushback that is used to discourage writers from enqueueing arbitrarily large amounts of data. As long as the length of the pipe exceeds the size budget, writers will not be notified to do further writing. Whenever the length is less than or equal to the size budget, writers will be notified to continue.
Every pipe's initial size budget is zero.
val set_size_budget : (_, _) t -> int -> unit
set_size_budget t i
changes the size budget oft
toi
. Any nonnegative value is allowed.
Debugging
val show_debug_messages : bool Core_kernel.ref
show_debug_messages
, if true, will cause a message to be printed at the start of each operation, showing the pipe and other arguments.
val check_invariant : bool Core_kernel.ref
check_invariant
, if true, will cause pipes' invariants to be checked at the start of each operation.
val set_info : (_, _) t -> Core_kernel.Sexp.t -> unit
set_info
updatest
'sinfo
field, which is displayed bysexp_of_t
, and thus in debugging messages.