Module Async_kernel__.Pipe
val sexp_of_t : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('a, 'phantom) t -> Ppx_sexp_conv_lib.Sexp.t
type ('a, 'phantom) pipe= ('a, 'phantom) t
val sexp_of_pipe : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> ('phantom -> Ppx_sexp_conv_lib.Sexp.t) -> ('a, 'phantom) pipe -> Ppx_sexp_conv_lib.Sexp.t
Reader and Writer modules
module Writer : sig ... endmodule Reader : sig ... endCreation
val create_reader : close_on_exception:bool -> ('a Writer.t -> unit Async_kernel.Deferred.t) -> 'a Reader.tcreate_reader ~close_on_exception fcreates a new pipe, appliesfto its writer end, and returns its reader end.create_readercloses the writer end when the result offbecomes determined. Iffraises, then the exception is raised to the caller ofcreate_reader. Whether or notcreate_readercloses the writer end uponfraising is determined byclose_on_exception.Choosing
~close_on_exception:falseis recommended, because normally closing the write end of a pipe is taken to mean that the writer completed successfully. Withclose_on_exception:true, the caller will both see the pipe closed and an exception will be raised to the monitor in effect whencreate_readerwas called. There is a race between those two actions, which can easily lead to confusion or bugs.
val create_writer : ('a Reader.t -> unit Async_kernel.Deferred.t) -> 'a Writer.tcreate_writeris symmetric withcreate_reader. It creates a new pipe, appliesfto its reader end, and returns its writer end.create_writercallsclose_readwhen the result offbecomes determined. Iffraises,create_writercloses the pipe and raises the exception to the caller ofcreate_writer.create_writercloses on exception, unlikecreate_reader, because closing closing the read end of a pipe is a signal to the writer that the consumer has failed.
val create : ?info:Core_kernel.Sexp.t -> unit -> 'a Reader.t * 'a Writer.tcreate ()creates a new pipe. It is preferable to usecreate_readerorcreate_writerinstead ofcreate, since they provide exception handling and automatic closing of the pipe.infois an arbitrary sexp displayed bysexp_of_t, for debugging purposes; see alsoset_info.
val empty : unit -> _ Reader.tempty ()returns a closed pipe reader with no contents.
val of_list : 'a list -> 'a Reader.tof_list lreturns a closed pipe reader filled with the contents ofl.
val singleton : 'a -> 'a Reader.tsingleton xreturns a closed pipe reader filled with the single valuex.
val unfold : init:'s -> f:('s -> ('a * 's) option Async_kernel.Deferred.t) -> 'a Reader.tunfold ~init ~freturns a pipe that it fills with'as by repeatedly applyingfto values of the state type's. WhenfreturnsNone, the resulting pipe is closed.unfoldrespects pushback on the resulting pipe. Iffraises, then the pipe is not closed.For example, to create a pipe of natural numbers:
Pipe.unfold ~init:0 ~f:(fun n -> return (Some (n, n+1)))
val of_sequence : 'a Core_kernel.Sequence.t -> 'a Reader.tof_sequence sequencereturns a pipe reader that gets filled with the elements ofsequence.of_sequencerespects pushback on the resulting pipe.
type 'a to_sequence_elt=|Value of 'a|Wait_for : _ Async_kernel.Deferred.t -> _ to_sequence_eltto_sequence readerreturns a sequence that can be consumed to extract values fromreader. IfWait_for dis returned, the consumer must wait fordto become determined before pulling the next value. Repeatedly asking for the next value without waiting ondwill infinite loop.
val to_sequence : 'a Reader.t -> 'a to_sequence_elt Core_kernel.Sequence.t
Closing
val close : _ Writer.t -> unitclose tcloses the write end of the pipe:- Future write attempts will fail, raising an exception.
- If, at the time of the close, there are reads blocked waiting for data, these reads will unblock, producing
`Eof.
- Future read attempts will drain the data that was in the pipe at the time of the close, until the pipe's buffer has been exhausted; subsequent reads will immediately get
`Eof.
Thus, after a pipe has been closed, reads never block.
closeis idempotent.
val close_read : _ Reader.t -> unitclose_read tcloses both the read and write ends of the pipe. It does everythingclosedoes, and in addition:- all pending flushes become determined with
`Reader_closed. - the pipe buffer is cleared.
- all subsequent reads will get
`Eof.
- all pending flushes become determined with
val is_closed : (_, _) t -> boolis_closed treturnstrueiffclose torclose_read thas been called.
val closed : (_, _) t -> unit Async_kernel.Deferred.tclosed treturns a deferred that becomes determined whenclose torclose_read tis called.
Flushing
module Flushed_result : sig ... endval upstream_flushed : (_, _) t -> Flushed_result.t Async_kernel.Deferred.tval downstream_flushed : (_, _) t -> Flushed_result.t Async_kernel.Deferred.t
module Consumer : sig ... endval add_consumer : _ Reader.t -> downstream_flushed:(unit -> Flushed_result.t Async_kernel.Deferred.t) -> Consumer.tadd_consumer reader ~downstream_flushedcreates a new consumer ofreader, and causes future calls toflushed_downstream readerto take this consumer into account. Thereafter,Pipe.flushed_downstream readerwill first ensure that values previously written toreaderhave been read, then that they have been sent downstream by the consumer that read them, and finally that they have been flushed downstream.One should only supply the resulting consumer to read operations on
reader. Using a consumer created from one reader with another reader will raise an exception.
Generic pipe operations
val length : (_, _) t -> intlength treturns the number of elements currently queued int.
val is_empty : (_, _) t -> boolis_empty tis true iff there are no values in the pipe.
Writing
val pushback : 'a Writer.t -> unit Async_kernel.Deferred.tpushback writerbecomes determined when eitherwriterhas been closed or the pipe can accept a new write.
val write : 'a Writer.t -> 'a -> unit Async_kernel.Deferred.twrite writer aenqueuesainwriter, returning a pushback deferred, as described above.transfer_in writer ~from:qtransfers the elements fromqintowriter, leavingqempty, and returning a pushback deferred.write_without_pushbackandtransfer_in_without_pushbackare alternatives totransfer_inandwritethat can be used when you don't care about the pushback deferred. They add data to the pipe and return immediately.The following equivalences hold:
write t a = write_without_pushback t a; pushback ttransfer_in t ~from = transfer_in_without_pushback t ~from; pushback t
If
is_closed writer, then all of these functions raise.
val write_without_pushback : 'a Writer.t -> 'a -> unitval transfer_in : 'a Writer.t -> from:'a Core_kernel.Queue.t -> unit Async_kernel.Deferred.tval transfer_in_without_pushback : 'a Writer.t -> from:'a Core_kernel.Queue.t -> unitval write_when_ready : 'a Writer.t -> f:(('a -> unit) -> 'b) -> [ `Closed | `Ok of 'b ] Async_kernel.Deferred.twrite_when_ready writer ~fwaits until there is space available in the pipe, and then callsf write, wherewritecan be used byfto write a single value into the pipe at a time.write_when_readyguarantees that the pipe is open when it callsf, and hence that the writes will succeed, unlessfitself closes the pipe.
val write_if_open : 'a Writer.t -> 'a -> unit Async_kernel.Deferred.twrite_if_open w eis equivalent to:let x = e in if not (is_closed w) then (write w x) else (return ())Note the difference in allocation and potential side effects when
wis closed andeis a complex expression.write_without_pushback_if_openis the same aswrite_if_open, except it callswrite_without_pushbackinstead ofwrite.
val write_without_pushback_if_open : 'a Writer.t -> 'a -> unit
Reading
val read' : ?consumer:Consumer.t -> ?max_queue_length:int -> 'a Reader.t -> [ `Eof | `Ok of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.tread' pipereads values available in the pipe, as soon as any value becomes available. The resulting queue will satisfy0 < Queue.length q <= max_queue_length.read'raises ifmax_queue_length <= 0. Theconsumeris used to extend the meaning of values being flushed (see theConsumermodule above).
val read : ?consumer:Consumer.t -> 'a Reader.t -> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.tread pipereads a single value from the pipe. Theconsumeris used to extend the meaning of values being flushed (see theConsumermodule above).
val read_exactly : ?consumer:Consumer.t -> 'a Reader.t -> num_values:int -> [ `Eof | `Fewer of 'a Core_kernel.Queue.t | `Exactly of 'a Core_kernel.Queue.t ] Async_kernel.Deferred.tread_exactly r ~num_valuesreads exactlynum_valuesitems, unless EOF is encountered.read_exactlyperforms a sequence ofread_at_mostoperations, so there is no guarantee that the queue of values it returns comprise a contiguous segment of the written stream of values -- other readers might pick off elements in-betweenread_exactly's atomic reads.read_exactlyraises ifnum_values <= 0. Theconsumeris used to extend the meaning of values being flushed (see theConsumermodule above).
val read_now' : ?consumer:Consumer.t -> ?max_queue_length:int -> 'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a Core_kernel.Queue.t ]read_now' readerreads values fromreaderthat are immediately available. Ifreaderis closed,read_now'returns`Eof. Ifreaderis empty,read_now'returns`Nothing_available. Otherwise,`Ok qis returned, and the resulting queue will satisfy0 < Q.length q <= max_queue_length. Theconsumeris used to extend the meaning of values being flushed (see theConsumermodule above).
val read_now : ?consumer:Consumer.t -> 'a Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a ]read_nowis likeread_now', except that it reads a single value rather than everything that is available.
val peek : 'a Reader.t -> 'a optionval clear : 'a Reader.t -> unitclear readerconsumes all of the values currently inreader, and all blocked flushes become determined with`Ok.
val read_all : 'a Reader.t -> 'a Core_kernel.Queue.t Async_kernel.Deferred.tread_all readerreads all the values from the pipe until it is closed. An alternative name might beReader.to_queue.
val values_available : _ Reader.t -> [ `Eof | `Ok ] Async_kernel.Deferred.tvalues_available readerreturns a deferred that becomes determined when there are values in the pipe. If there are multiple readers (a rare situation), there is no guarantee that some other reader hasn't become active because of ordinary Async scheduling and removed some or all of the values between the time the result ofvalues_availablebecomes determined and the time something waitinguponthat result runs.values_availableis useful when one wants tochooseon values being available in a pipe, so that one can be sure and not remove values and drop them on the floor.values_availableis roughly equivalent toread' ~max_queue_length:0.
val read_choice : 'a Reader.t -> [ `Eof | `Ok of 'a | `Nothing_available ] Async_kernel.Deferred.Choice.tread_choice readeris:choice (values_available reader) (fun (_ : [ `Ok | `Eof ]) -> read_now reader)read_choiceconsumes a value fromreaderiff the choice is taken.read_choiceexists to discourage the broken idiom:choice (read reader) (fun ...)which is broken because it reads from
readereven if the choice isn't taken.`Nothing_availablecan only be returned if there is a race condition with one or more other consumers.read_choice_single_consumer_exn reader [%here]is likeread_choice reader, but it raises in the case of`Nothing_available. It is intended to be used whenreaderhas no other consumers.
val read_choice_single_consumer_exn : 'a Reader.t -> Core_kernel.Source_code_position.t -> [ `Eof | `Ok of 'a ] Async_kernel.Deferred.Choice.t
Sequence functions
module Flushed : sig ... endval fold' : ?flushed:Flushed.t -> ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a Core_kernel.Queue.t -> 'accum Async_kernel.Deferred.t) -> 'accum Async_kernel.Deferred.tfold' reader ~init ~freads a batch of elements fromreader, supplies them tof, waits forfto finish, and then repeats.fold'finishes when the call tofon the final batch of elements fromreaderfinishes.
val fold : ?flushed:Flushed.t -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum Async_kernel.Deferred.t) -> 'accum Async_kernel.Deferred.tval fold_without_pushback : ?consumer:Consumer.t -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum) -> 'accum Async_kernel.Deferred.tval iter' : ?continue_on_error:bool -> ?flushed:Flushed.t -> ?max_queue_length:int -> 'a Reader.t -> f:('a Core_kernel.Queue.t -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.titer' reader ~frepeatedly appliesfto batches of elements ofreader, waiting for each call tofto finish before continuing. The deferred returned byiter'becomes determined when the call tofon the final batch of elements finishes.~continue_on_error:truecauses the iteration to continue even iffraises.~flushed:When_value_processedmeans values in batchbare flushed only afterf bis filled.
val iter : ?continue_on_error:bool -> ?flushed:Flushed.t -> 'a Reader.t -> f:('a -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.titer t fis a specialization ofiter'that applies thefto each element in the batch, waiting for one call tofto finish before making the next call tof.
val iter_without_pushback : ?consumer:Consumer.t -> ?continue_on_error:bool -> ?max_iterations_per_job:int -> 'a Reader.t -> f:('a -> unit) -> unit Async_kernel.Deferred.titer_without_pushback t ~fappliesfto each element int, without givingfa chance to pushback on the iteration continuing. Iffraises on some element oft,iter_without_pushbackwill not consume any further elements.iter_without_pushbackwill not make more thanmax_iterations_per_jobcalls tofin a single Async_job; this can be used to increase Async-scheduling fairness.
val transfer' : ?max_queue_length:int -> 'a Reader.t -> 'b Writer.t -> f:('a Core_kernel.Queue.t -> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.ttransfer' input output ~frepeatedly reads a batch of elements frominput, appliesfto the batch, writes the result as a batch tooutput, and then waits onpushbackinoutputbefore continuing.transfer'finishes ifinputis closed oroutputis closed. Ifoutputis closed, thentransfer'closesinput. Use~max_queue_length:1to cause elements to appear on the output pipe as soon as they are processed, without having to wait for the entire queue.
val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Async_kernel.Deferred.ttransferis liketransfer', except that it processes one element at a time.
val transfer_id : ?max_queue_length:int -> 'a Reader.t -> 'a Writer.t -> unit Async_kernel.Deferred.ttransfer_idis a specialization oftransfer'withf = Fn.id.
val map' : ?max_queue_length:int -> 'a Reader.t -> f:('a Core_kernel.Queue.t -> 'b Core_kernel.Queue.t Async_kernel.Deferred.t) -> 'b Reader.tmap' input ~freturns a reader,output, and repeatedly appliesfto batches of elements frominput, with the results appearing inoutput. If values are not being consumed fromoutput,map'will pushback and stop consuming values frominput. Ifoutputis closed, thenmap'will closeinput. Use~max_queue_length:1to cause elements to appear on the output pipe as soon as they are processed, without having to wait for the entire queue.
val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.tmapis likemap', except that it processes one element at a time.
val folding_map : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b) -> 'b Reader.tfolding_mapis a version ofmapthat threads an accumulator through calls tof.
val filter_map' : ?max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option Async_kernel.Deferred.t) -> 'b Reader.tfilter_map' input ~freturns a reader,output, and repeatedly appliesfto elements frominput, with the results that aren'tNoneappearing inoutput. If values are not being consumed fromoutput,filter_map'will pushback and stop consuming values frominput. Ifoutputis closed, thenfilter_map'will closeinput.filter_map'processes elements in batches as permax_queue_length; in a single batch, all outputs will propagate to the result only when all inputs have been processed.
val filter_map : ?max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.tfilter_mapis a specialized version offilter_map'.
val folding_filter_map' : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> ('accum * 'b option) Async_kernel.Deferred.t) -> 'b Reader.tfolding_filter_map'is a version offilter_map'that threads an accumulator through calls tof. Likefilter_map',folding_filter_map'processes elements in batches as permax_queue_length; in a single batch, all outputs will propagate to the result only when all inputs have been processed.
val folding_filter_map : ?max_queue_length:int -> 'a Reader.t -> init:'accum -> f:('accum -> 'a -> 'accum * 'b option) -> 'b Reader.tfolding_filter_mapis a specialized version offolding_filter_map'.
val filter : 'a Reader.t -> f:('a -> bool) -> 'a Reader.tfilter input ~freturns a reader,output, and copies tooutputeach element frominputthat satisfies the predicatef. Ifoutputis closed, thenfilterclosesinput.
val interleave : 'a Reader.t list -> 'a Reader.tinterleave inputsreturns a reader,output, and, for each input, transfers batches of values from that input tooutput, usingtransfer_id. Each input is transferred tooutputindependently. So, batches of values from different inputs can be in flight tooutputsimultaneously, but at most one batch at a time from any particular input. The operation is complete when either all theinputsproduce EOF, or whenoutputis closed by the downstream consumer (in which caseinterleavecloses all theinputs).
val interleave_pipe : 'a Reader.t Reader.t -> 'a Reader.tval merge : 'a Reader.t list -> compare:('a -> 'a -> int) -> 'a Reader.tmerge inputs ~comparereturns a reader,output, that merges all the inputs. Assuming that for each input, values are sorted according to the comparison functioncompare, values for each input will be transfered tooutputand the values returned byoutputwill be sorted according tocompare.
val concat : 'a Reader.t list -> 'a Reader.tconcat inputsreturn a reader,output, with the values from each pipe ininputsin sequence.concatclosesoutputonce it reaches EOF on the final input. Ifoutputis closed, thenconcatcloses all its inputs.
val concat_pipe : 'a Reader.t Reader.t -> 'a Reader.tconcat_pipeis likeconcat, but it takes a pipe of inputs instead of a list, and closes the input pipe when the output pipe is closed.
val fork : 'a Reader.t -> pushback_uses:[ `Both_consumers | `Fast_consumer_only ] -> 'a Reader.t * 'a Reader.tfork inputreturns a pair of readers and transfers each of the values ininputinto both of the returned readers. It closesinputearly if both of the readers are closed early.If
pushback_uses = `Both_consumers, thenforkwaits forpushbackon both readers when writing. If one of the readers is not read from or is slow to be read from, it may block the other from receiving data. Beware of possible deadlocks in downstream code due to blocking on reading too many elements from one before reading the other.If
pushback_uses = `Fast_consumer_only, thenforkwaits forpushbackonly on the faster of the two readers when writing. In this case the slow reader cannot block the faster one, butforkcould be forced to buffer arbitrarily many elements. Beware of unbounded resource usage in downstream code where one reader might fall behind.Note that
upstream_flushedwill not work with the pipes returned byfork.
val to_stream_deprecated : 'a Reader.t -> 'a Async_kernel__.Async_stream.tto_stream_deprecated readerreturns a stream that reads everything from the pipe. This function is deprecated because one should change the code that is consuming a stream to instead consume from a pipe reader.
val of_stream_deprecated : 'a Async_kernel__.Async_stream.t -> 'a Reader.tof_stream_deprecated readerreturns a pipe that has one element for every element on the stream. This function is deprecated because one should change the code that is producing a stream to instead produce a pipe reader.
val drain : 'a Reader.t -> unit Async_kernel.Deferred.tdrain readerrepeatedly reads values fromreaderand throws them away.drain_and_countis likedrain, except it also counts the number of values it has read.
val drain_and_count : 'a Reader.t -> int Async_kernel.Deferred.tval to_list : 'a Reader.t -> 'a list Async_kernel.Deferred.tto_list inputreads everything frominput; on EOF, it produces the accumulated list of these values.
Miscellaneous
val hash : (_, _) t -> inthashis a hash function based on the internal id of the pipe.
Size budget
val size_budget : (_, _) t -> intEvery pipe has a "size budget", which governs the pushback that is used to discourage writers from enqueueing arbitrarily large amounts of data. As long as the length of the pipe exceeds the size budget, writers will not be notified to do further writing. Whenever the length is less than or equal to the size budget, writers will be notified to continue.
Every pipe's initial size budget is zero.
val set_size_budget : (_, _) t -> int -> unitset_size_budget t ichanges the size budget ofttoi. Any nonnegative value is allowed.
Debugging
val show_debug_messages : bool Core_kernel.refshow_debug_messages, if true, will cause a message to be printed at the start of each operation, showing the pipe and other arguments.
val check_invariant : bool Core_kernel.refcheck_invariant, if true, will cause pipes' invariants to be checked at the start of each operation.
val set_info : (_, _) t -> Core_kernel.Sexp.t -> unitset_infoupdatest'sinfofield, which is displayed bysexp_of_t, and thus in debugging messages.