Module Async_unix.Reader
Reader is Async's main API for buffered input from a file descriptor. It is the analog of Stdio.In_channel.
Each reader has an internal buffer, which is filled via read() system calls when data is needed to satisfy a Reader.read* call.
Each of the read functions returns a deferred that will become determined when the read completes. It is an error to have two simultaneous reads. That is, if you call a read function, you should not call another read function until the first one completes.
If the file descriptor underlying a reader is closed, the reader will return EOF (after all the buffered bytes have been read).
Any Reader.read* call could, rather than determine its result, send an exception to the monitor in effect when read was called. Such exceptions can be handled in the usual way by using try_with, e.g.:
try_with (fun () -> Reader.read reader ...) module Read_result : sig ... endmodule Id : Core.Unique_idval sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
include Async_unix__.Import.Invariant.S with type t := t
val invariant : t Base__.Invariant_intf.inv
val io_stats : Io_stats.tOverall IO statistics for all readers.
val last_read_time : t -> Core.Time.tReturns time of the most recent
readsystem call that returned data.
val stdin : t Core.Lazy.tstdinis a reader for file descriptor 0. It is lazy because we don't want to create it in all programs that happen to link with Async.
val open_file : ?buf_len:int -> string -> t Async_unix__.Import.Deferred.topen_file fileopensfilefor reading and returns a reader reading from it.This may raise an exception for the typical reasons that an
open(2)system call may fail. If it does raise, it's guaranteed to be aUnix_errorvariant.
val transfer : t -> string Async_unix__.Import.Pipe.Writer.t -> unit Async_unix__.Import.Deferred.ttransfer t pipe_wtransfers data fromtintopipe_wone chunk at a time (whatever is read from the underlying file descriptor without post-processing). The result becomes determined after reaching EOF ontand the final bytes have been transferred, or ifpipe_wis closed.This function will normally not be needed (see
pipe).
val pipe : t -> string Async_unix__.Import.Pipe.Reader.tpipe treturns the reader end of a pipe that will continually be filled with chunks of data from the underlyingReader.t. When the reader reaches EOF or the pipe is closed,pipecloses the reader, and then after the reader close is finished, closes the pipe.
val of_pipe : Core.Info.t -> string Async_unix__.Import.Pipe.Reader.t -> t Async_unix__.Import.Deferred.tof_pipe info pipe_rreturns a readertthat receives all the data frompipe_r. Ifpipe_ris closed,twill see an EOF (but will not be automatically closed). Iftis closed, thenpipe_rwill stop being drained.of_pipeis implemented by shuttling bytes frompipe_rto the write-end of a Unix pipe, withtbeing attached to the read end of the Unix pipe.
val create : ?buf_len:int -> Fd.t -> tcreate ~buf_len fdcreates a new reader that is reading fromfd.
val of_in_channel : Core.In_channel.t -> Fd.Kind.t -> tval with_file : ?buf_len:int -> ?exclusive:bool -> string -> f:(t -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.twith_file file fopensfiles, creates a reader with it, and passes the reader tof. It closes the reader when the result offbecomes determined, and returnsf's result.This may raise an exception for the typical reasons that an
open(2)system call may fail. If it does raise beforefis called, it's guaranteed to be aUnix_errorvariant.Note: You need to be careful that all your IO is done when the deferred you return becomes determined. If for example you use
with_fileand calllines, make sure you return a deferred that becomes determined when the EOF is reached on the pipe, not when you get the pipe (because you get it straight away).
val close : t -> unit Async_unix__.Import.Deferred.tclose tprevents further use oftand closest's underlying file descriptor. The result ofclosebecomes determined once the underlying file descriptor has been closed. It is an error to call other operations ontafterclose thas been called, except that calls ofclosesubsequent to the original call toclosewill return the same deferred as the original call.close_finished tbecomes determined aftert's underlying file descriptor has been closed, i.e., it is the same as the result ofclose.close_finisheddiffers fromclosein that it does not have the side effect of initiating a close.is_closed treturnstrueiffclose thas been called.with_close t ~frunsf (), and closestafterffinishes or raises.
val close_finished : t -> unit Async_unix__.Import.Deferred.tval is_closed : t -> boolval with_close : t -> f:(unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.tval id : t -> Id.tidreturns a name for this reader that is unique across all instances of the reader module.
val read : t -> ?pos:int -> ?len:int -> Core.Bytes.t -> int Read_result.t Async_unix__.Import.Deferred.tread t ?pos ?len bufreads up tolenbytes intobuf, blocking until some data is available or EOF is reached. The resultingisatisfies0 < i <= len.
val peek : t -> len:int -> string Read_result.t Async_unix__.Import.Deferred.tpeek t ~lenpeeks exactlylenbytes fromt's buffer. It blocks untillenbytes are available or EOF is reached.
val bytes_available : t -> intReports how many bytes of data are currently in the reader's buffer.
val read_available : t -> ?pos:int -> ?len:int -> Core.Bytes.t -> intConsumes data from the reader's buffer without performing any additional I/O.
val peek_available : t -> len:int -> stringReads up to
lenbytes from the reader's buffer without consuming it and without performing any additional I/O.
val drain : t -> unit Async_unix__.Import.Deferred.tdrain treads and ignores all data fromtuntil it hits EOF, and then closest.
type 'a read_one_chunk_at_a_time_result=[]
val sexp_of_read_one_chunk_at_a_time_result : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a read_one_chunk_at_a_time_result -> Ppx_sexp_conv_lib.Sexp.t
type 'a handle_chunk_result=[]
val sexp_of_handle_chunk_result : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a handle_chunk_result -> Ppx_sexp_conv_lib.Sexp.t
val read_one_chunk_at_a_time : t -> handle_chunk:(Core.Bigstring.t -> pos:int -> len:int -> 'a handle_chunk_result Async_unix__.Import.Deferred.t) -> 'a read_one_chunk_at_a_time_result Async_unix__.Import.Deferred.tread_one_chunk_at_a_time t ~handle_chunkreads intot's internal buffer, and whenever bytes are available, applieshandle_chunkto them. It waits to read again until the deferred returned byhandle_chunkbecomes determined. Ifhandle_chunkreturns`Consumed, thenread_one_chunk_at_a_timewill wait for additional data to arrive before callinghandle_chunkagain. Thus,handle_chunkshould consume as much as possible.read_one_chunk_at_a_timecontinues reading until it reaches`Eoforhandle_chunkreturns`Stopor`Stop_consumed. In the case of`Stopand`Stop_consumed, one may read fromtafterread_one_chunk_at_a_timereturns.
type 'a handle_iobuf_result=[|`Stop of 'a|`Continue]`Stop aor`Continuerespects the usualIobufsemantics where data up to theIobuf.Lo_boundis considered consumed.
val sexp_of_handle_iobuf_result : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a handle_iobuf_result -> Ppx_sexp_conv_lib.Sexp.t
val read_one_iobuf_at_a_time : t -> handle_chunk:((Core.read_write, Core.Iobuf.seek) Core.Iobuf.t -> 'a handle_iobuf_result Async_unix__.Import.Deferred.t) -> 'a read_one_chunk_at_a_time_result Async_unix__.Import.Deferred.tread_one_iobuf_at_a_timeis likeread_one_chunk_at_a_time, except that the user-suppliedhandle_chunkfunction receives its data in anIobuf.t, and uses theIobufposition to communicate how much data was consumed.read_one_iobuf_at_a_timeis implemented as a wrapper aroundread_one_chunk_at_a_time.
val read_substring : t -> Core.Substring.t -> int Read_result.t Async_unix__.Import.Deferred.tread_substring t ssreads up toSubstring.length ssbytes intoss, blocking until some data is available or EOF is reached. The resultingisatisfies0 < i <= Substring.length ss.
val read_bigsubstring : t -> Core.Bigsubstring.t -> int Read_result.t Async_unix__.Import.Deferred.tval read_char : t -> char Read_result.t Async_unix__.Import.Deferred.tval really_read : t -> ?pos:int -> ?len:int -> Core.Bytes.t -> [ `Ok | `Eof of int ] Async_unix__.Import.Deferred.treally_read t buf ?pos ?lenreads until it fillslenbytes ofbufstarting atpos, or runs out of input. In the former case it returns`Ok. In the latter, it returns`Eof nwherenis the number of bytes that were read before end of input, and0 <= n < String.length ss.
val really_read_substring : t -> Core.Substring.t -> [ `Ok | `Eof of int ] Async_unix__.Import.Deferred.tval really_read_bigsubstring : t -> Core.Bigsubstring.t -> [ `Ok | `Eof of int ] Async_unix__.Import.Deferred.tval read_until : t -> [ `Pred of char -> bool | `Char of char ] -> keep_delim:bool -> [ `Ok of string | `Eof_without_delim of string | `Eof ] Async_unix__.Import.Deferred.tread_until t pred ~keep_delimreads until it hits a delimitercsuch that:- if
pred = `Char c'thenc = c' - if
pred = `Pred pthenp c
`Char c'is equivalent to`Pred (fun c -> c = c')but the underlying implementation is more efficient, in particular it will not call a function on every input character.read_untilreturns a freshly-allocated string consisting of all the characters read and optionally including the delimiter as perkeep_delim.- if
val read_until_max : t -> [ `Pred of char -> bool | `Char of char ] -> keep_delim:bool -> max:int -> [ `Ok of string | `Eof_without_delim of string | `Eof | `Max_exceeded of string ] Async_unix__.Import.Deferred.tread_until_maxis just likeread_until, except you have the option of specifying a maximum number of chars to read.
val read_line : t -> string Read_result.t Async_unix__.Import.Deferred.tread_line treads up to and including the next newline (\n) character (or\r\n) and returns a freshly-allocated string containing everything up to but not including the newline character. Ifread_lineencounters EOF before the newline char then everything read up to but not including EOF will be returned as a line.
val really_read_line : wait_time:Core.Time.Span.t -> t -> string option Async_unix__.Import.Deferred.treally_read_line ~wait_time treads up to and including the next newline (\n) character and returns an optional, freshly-allocated string containing everything up to but not including the newline character. Ifreally_read_lineencounters EOF before the newline char, then a time span ofwait_timewill be used before the input operation is retried. If the descriptor is closed,Nonewill be returned.
type 'a read= ?parse_pos:Core.Sexp.Parse_pos.t -> 'a
val read_sexp : (t -> Core.Sexp.t Read_result.t Async_unix__.Import.Deferred.t) readread_sexp treads the next sexp.
val read_sexps : (t -> Core.Sexp.t Async_unix__.Import.Pipe.Reader.t) readread_sexps treads all the sexps and returns them as a pipe. When the reader reaches EOF or the pipe is closed,read_sexpscloses the reader, and then after the reader close is finished, closes the pipe.
val read_annotated_sexps : (t -> Core.Sexp.Annotated.t Async_unix__.Import.Pipe.Reader.t) readval read_bin_prot : ?max_len:int -> t -> 'a Bin_prot.Type_class.reader -> 'a Read_result.t Async_unix__.Import.Deferred.tread_bin_prot ?max_len t bp_readerreads the next binary protocol message using binary protocol readerbp_reader. The format is the "size-prefixed binary protocol", in which the length of the data is prefixed as a 64-bit integer to the data. This is the format thatWriter.write_bin_protwrites.For higher performance, consider
Unpack_sequence.unpack_bin_prot_from_reader.
val peek_bin_prot : ?max_len:int -> t -> 'a Bin_prot.Type_class.reader -> 'a Read_result.t Async_unix__.Import.Deferred.tSimilar to
read_bin_prot, but doesn't consume any bytes fromt.
val read_marshal_raw : t -> Core.Bytes.t Read_result.t Async_unix__.Import.Deferred.tread_marshal_rawreads and returns a buffer containing one marshaled value, but doesn't unmarshal it. You can just callMarshal.from_stringon the string, and cast it to the desired type (preferably the actual type). Similar toMarshal.from_channel, but suffers from the String-length limitation (16MB) on 32-bit platforms.
val read_marshal : t -> _ Read_result.t Async_unix__.Import.Deferred.tread_marshalis likeread_marshal_raw, but unmarshals the value after reading it.
val recv : t -> Core.Bytes.t Read_result.t Async_unix__.Import.Deferred.trecvreturns a string that was written withWriter.send.
val read_all : t -> (t -> 'a Read_result.t Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Pipe.Reader.tread_all t read_onereturns a pipe that receives all values read fromtby repeatedly usingread_one t. When the reader reaches EOF, it closes the reader, and then after the reader close is finished, closes the pipe.
val lseek : t -> int64 -> mode:[< `Set | `End ] -> int64 Async_unix__.Import.Deferred.tlseek t offset ~modeclearst's buffer and callsUnix.lseekont's file descriptor. The`Curmode is not exposed because seeking relative to the current position of the file descriptor is not the same as seeking relative to the current position of the reader.
val ltell : t -> int64 Async_unix__.Import.Deferred.tltell treturns the file position oftfrom the perspective of a consumer oft. It usesUnix.lseekto find the file position oft's underlying file descriptor, and then subtracts the number of bytes int's buffer that have been read from the OS but not fromt.
val lines : t -> string Async_unix__.Import.Pipe.Reader.tlines treads all the lines fromtand puts them in the pipe, one line per pipe element. The lines do not contain the trailing newline. When the reader reaches EOF or the pipe is closed,linescloses the reader, and then after the reader close is finished, closes the pipe.
val contents : t -> string Async_unix__.Import.Deferred.tcontents treturns the string corresponding to the full contents (up to EOF) of the reader.contentsclosestbefore returning the string.
val file_contents : string -> string Async_unix__.Import.Deferred.tfile_contents filereturns the string with the full contents of the file.
val file_lines : string -> string list Async_unix__.Import.Deferred.tfile_lines filereturns a list of the lines in the file. The lines do not contain the trailing newline.
type ('sexp, 'a, 'b) load= ?exclusive:bool -> string -> ('sexp -> 'a) -> 'b Async_unix__.Import.Deferred.t
val load_sexp : (Core.Sexp.t, 'a, 'a Core.Or_error.t) loadval load_sexp_exn : (Core.Sexp.t, 'a, 'a) loadval load_sexps : (Core.Sexp.t, 'a, 'a list Core.Or_error.t) loadval load_sexps_exn : (Core.Sexp.t, 'a, 'a list) loadval load_annotated_sexp : (Core.Sexp.Annotated.t, 'a, 'a Core.Or_error.t) loadval load_annotated_sexp_exn : (Core.Sexp.Annotated.t, 'a, 'a) loadval load_annotated_sexps : (Core.Sexp.Annotated.t, 'a, 'a list Core.Or_error.t) loadval load_annotated_sexps_exn : (Core.Sexp.Annotated.t, 'a, 'a list) load
type ('a, 'b) load_bin_prot= ?exclusive:bool -> ?max_len:int -> string -> 'a Bin_prot.Type_class.reader -> 'b Async_unix__.Import.Deferred.t
val load_bin_prot : ('a, 'a Core.Or_error.t) load_bin_protval load_bin_prot_exn : ('a, 'a) load_bin_prot