Module Async_unix__.Writer
module Id : Core.Unique_id
module Line_ending : sig ... end
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
include Async_unix__.Import.Invariant.S with type t := t
val invariant : t Base__.Invariant_intf.inv
val io_stats : Async_unix.Io_stats.t
Overall IO statistics for all writers.
val stdout : t Core.Lazy.t
stdout
andstderr
are writers for file descriptors 1 and 2. They are lazy because we don't want to create them in all programs that happen to link with Async.When either
stdout
orstderr
is created, they both are created. Furthermore, if they point to the same inode, then they will be the same writer toFd.stdout
. This can be confusing, becausefd (force stderr)
will beFd.stdout
, notFd.stderr
. And subsequent modifications ofFd.stderr
will have no effect onWriter.stderr
.Unfortunately, the sharing is necessary because Async uses OS threads to do
write()
syscalls using the writer buffer. When calling a program that redirects stdout and stderr to the same file, as in:foo.exe >/tmp/z.file 2>&1
if
Writer.stdout
andWriter.stderr
weren't the same writer, then they could have threads simultaneously writing to the same file, which could easily cause data loss.
val stderr : t Core.Lazy.t
type buffer_age_limit
=[
|
`At_most of Core.Time.Span.t
|
`Unlimited
]
val bin_shape_buffer_age_limit : Bin_prot.Shape.t
val bin_size_buffer_age_limit : buffer_age_limit Bin_prot.Size.sizer
val bin_write_buffer_age_limit : buffer_age_limit Bin_prot.Write.writer
val bin_writer_buffer_age_limit : buffer_age_limit Bin_prot.Type_class.writer
val bin_read_buffer_age_limit : buffer_age_limit Bin_prot.Read.reader
val __bin_read_buffer_age_limit__ : (int -> buffer_age_limit) Bin_prot.Read.reader
val bin_reader_buffer_age_limit : buffer_age_limit Bin_prot.Type_class.reader
val bin_buffer_age_limit : buffer_age_limit Bin_prot.Type_class.t
val sexp_of_buffer_age_limit : buffer_age_limit -> Ppx_sexp_conv_lib.Sexp.t
val buffer_age_limit_of_sexp : Ppx_sexp_conv_lib.Sexp.t -> buffer_age_limit
val __buffer_age_limit_of_sexp__ : Ppx_sexp_conv_lib.Sexp.t -> buffer_age_limit
val create : ?buf_len:int -> ?syscall:[ `Per_cycle | `Periodic of Core.Time.Span.t ] -> ?buffer_age_limit:buffer_age_limit -> ?raise_when_consumer_leaves:bool -> ?line_ending:Line_ending.t -> ?time_source:[> Core.read ] Async_unix__.Import.Time_source.T1.t -> Async_unix.Fd.t -> t
create ?buf_len ?syscall ?buffer_age_limit fd
creates a new writer. The file descriptorfd
should not be in use for writing by anything else.By default, a write system call occurs at the end of a cycle in which bytes were written. One can supply
~syscall:(`Periodic span)
to get better performance. This batches writes together, doing the write system call periodically according to the supplied span.A writer can asynchronously fail if the underlying write syscall returns an error, e.g.,
EBADF
,EPIPE
,ECONNRESET
, ....buffer_age_limit
specifies how backed up you can get before raising an exception. The default is`Unlimited
for files, and 2 minutes for other kinds of file descriptors. You can supply`Unlimited
to turn off buffer-age checks.raise_when_consumer_leaves
specifies whether the writer should raise an exception when the consumer receiving bytes from the writer leaves, i.e., in Unix, the write syscall returnsEPIPE
orECONNRESET
. Ifnot raise_when_consumer_leaves
, then the writer will silently drop all writes after the consumer leaves, and the writer will eventually fail with a writer-buffer-older-than error if the application remains open long enough.line_ending
determines hownewline
andwrite_line
terminate lines by default. Ifline_ending = Unix
then end of line is"\n"
; ifline_ending = Dos
then end of line is"\r\n"
. Note thatline_ending = Dos
is not equivalent to opening the file in text mode because any "\n" characters being printed by other means (e.g.,write "\n"
) are still written verbatim (in Unix style).time_source
is useful in tests to triggerbuffer_age_limit
-related conditions, or simply to have the result of (for example)flushed_time_ns
agree with your test's synthetic time. It is also used to schedule the`Periodic
syscalls.
val raise_when_consumer_leaves : t -> bool
val set_raise_when_consumer_leaves : t -> bool -> unit
set_raise_when_consumer_leaves t bool
sets theraise_when_consumer_leaves
flag oft
, which determies howt
responds to a write system call raisingEPIPE
andECONNRESET
(seecreate
).
val set_buffer_age_limit : t -> buffer_age_limit -> unit
set_buffer_age_limit t buffer_age_limit
replaces the existing buffer age limit with the new one. This is useful for stdout and stderr, which are lazily created in a context that does not allow applications to specifybuffer_age_limit
.
val consumer_left : t -> unit Async_unix__.Import.Deferred.t
consumer_left t
returns a deferred that becomes determined whent
attempts to write to a pipe that broke because the consumer on the other side left.
val of_out_channel : Core.Out_channel.t -> Async_unix.Fd.Kind.t -> t
val open_file : ?append:bool -> ?buf_len:int -> ?syscall:[ `Per_cycle | `Periodic of Core.Time.Span.t ] -> ?perm:int -> ?line_ending:Line_ending.t -> ?time_source:[> Core.read ] Async_unix__.Import.Time_source.T1.t -> string -> t Async_unix__.Import.Deferred.t
open_file file
opensfile
for writing and returns a writer for it. It usesUnix_syscalls.openfile
to open the file.
val with_file : ?perm:int -> ?append:bool -> ?exclusive:bool -> ?line_ending:Line_ending.t -> ?time_source:[> Core.read ] Async_unix__.Import.Time_source.T1.t -> string -> f:(t -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t
with_file ~file f
opensfile
for writing, creates a writert
, and runsf t
to obtain a deferredd
. Whend
becomes determined, the writer is closed. When the close completes, the result ofwith_file
becomes determined with the value ofd
.There is no need to call
Writer.flushed
to ensure thatwith_file
waits for the writer to be flushed before closing it.Writer.close
will already wait for the flush.
val fd : t -> Async_unix.Fd.t
fd
returns theFd.t
used to create this writer.
val set_fd : t -> Async_unix.Fd.t -> unit Async_unix__.Import.Deferred.t
set_fd t fd
sets thefd
used byt
for its underlying system calls. It first waits until everything being sent to the currentfd
is flushed. Of course, one must understand how the writer works and what one is doing to use this.
val write_gen : ?pos:int -> ?len:int -> t -> 'a -> blit_to_bigstring:('a, Core.Bigstring.t) Core.Blit.blit -> length:('a -> int) -> unit
write_gen t a
writesa
to writert
, withlength
specifying the number of bytes needed andblit_to_bigstring
blittinga
directly into thet
's buffer. If one has a type that haslength
andblit_to_bigstring
functions, like:module A : sig type t val length : t -> int val blit_to_bigstring : (t, Bigstring.t) Blit.blit end
then one can use
write_gen
to implement a custom analog ofWriter.write
, like:module Write_a : sig val write : ?pos:int -> ?len:int -> A.t -> Writer.t -> unit end = struct let write ?pos ?len a writer = Writer.write_gen ~length:A.length ~blit_to_bigstring:A.blit_to_bigstring ?pos ?len writer a end
In some cases it may be difficult to write only part of a value:
module B : sig type t val length : t -> int val blit_to_bigstring : t -> Bigstring.t -> pos:int -> unit end
In these cases, use
write_gen_whole
instead. It never requires writing only part of a value, although it is potentially less space-efficient. It may waste portions of previously-allocated write buffers if they are too small.module Write_b : sig val write : B.t -> Writer.t -> unit end = struct let write b writer = Writer.write_gen_whole ~length:B.length ~blit_to_bigstring:B.blit_to_bigstring writer b end
Note:
write_gen
andwrite_gen_whole
give you access to the writer's internal buffer. You should not capture it; doing so might lead to errors of the segfault kind.
val write_gen_whole : t -> 'a -> blit_to_bigstring:('a -> Core.Bigstring.t -> pos:int -> unit) -> length:('a -> int) -> unit
val write_direct : t -> f:(Core.Bigstring.t -> pos:int -> len:int -> 'a * int) -> 'a option
write_direct t ~f
givest
's internal buffer tof
.pos
andlen
define the portion of the buffer that can be filled.f
must return a pair(x, written)
wherewritten
is the number of bytes written to the buffer atpos
.write_direct
raises ifwritten < 0 || written > len
.write_direct
returnsSome x
, orNone
if the writer is stopped. By usingwrite_direct
only, one can ensure that the writer's internal buffer never grows. Look at thewrite_direct
expect tests for an example of how this can be used to construct awrite_string
like function that never grows the internal buffer.
val write_bytes : ?pos:int -> ?len:int -> t -> Core.Bytes.t -> unit
write ?pos ?len t s
adds a job to the writer's queue of pending writes. The contents of the string are copied to an internal buffer beforewrite
returns, so clients can do whatever they want withs
after that.
val write : ?pos:int -> ?len:int -> t -> string -> unit
val write_bigstring : ?pos:int -> ?len:int -> t -> Core.Bigstring.t -> unit
val write_iobuf : ?pos:int -> ?len:int -> t -> ([> Core.read ], _) Core.Iobuf.t -> unit
val write_substring : t -> Core.Substring.t -> unit
val write_bigsubstring : t -> Core.Bigsubstring.t -> unit
val writef : t -> ('a, unit, string, unit) Core.format4 -> 'a
val to_formatter : t -> Stdlib.Format.formatter
to_formatter
returns an OCaml-formatter that one can print to usingFormat
.fprintf. Note that flushing the formatter will only submit all buffered data to the writer, but does not guarantee flushing to the operating system.
val write_char : t -> char -> unit
write_char t c
writes the character.
val newline : ?line_ending:Line_ending.t -> t -> unit
newline t
writes the end-of-line terminator.line_ending
can overridet
'sline_ending
.
val write_line : ?line_ending:Line_ending.t -> t -> string -> unit
write_line t s ?line_ending
iswrite t s; newline t ?line_ending
.
val write_byte : t -> int -> unit
write_byte t i
writes one 8-bit integer (as the single character with that code). The given integer is taken modulo 256.
module Terminate_with : sig ... end
val write_sexp : ?hum:bool -> ?terminate_with:Terminate_with.t -> t -> Core.Sexp.t -> unit
write_sexp t sexp
writes tot
the string representation ofsexp
, possibly followed by a terminating character as perTerminate_with
. With~terminate_with:Newline
, the terminating character is a newline. With~terminate_with:Space_if_needed
, if a space is needed to ensure that the sexp reader knows that it has reached the end of the sexp, then the terminating character will be a space; otherwise, no terminating character is added. A terminating space is needed if the string representation doesn't end in')'
or'"'
.
val write_bin_prot : t -> 'a Bin_prot.Type_class.writer -> 'a -> unit
write_bin_prot
writes out a value using its bin_prot sizer/writer pair. The format is the "size-prefixed binary protocol", in which the length of the data is written before the data itself. This is the format thatReader.read_bin_prot
reads.
val write_bin_prot_no_size_header : t -> size:int -> 'a Bin_prot.Write.writer -> 'a -> unit
Writes out a value using its bin_prot writer. Unlike
write_bin_prot
, this doesn't prefix the output with the size of the bin_prot blob.size
is the expected size. This function will raise if the bin_prot writer writes an amount other thansize
bytes.
val schedule_bigstring : t -> ?pos:int -> ?len:int -> Core.Bigstring.t -> unit
schedule_bigstring t bstr
schedules a write of bigstringbstr
. It is not safe to change the bigstring until the writer has been successfully flushed or closed after this operation.
val schedule_bigsubstring : t -> Core.Bigsubstring.t -> unit
val schedule_iobuf_peek : t -> ?pos:int -> ?len:int -> ([> Core.read ], _) Core.Iobuf.t -> unit
schedule_iobuf_peek
is likeschedule_bigstring
, but for an iobuf. It is not safe to change the iobuf until the writer has been successfully flushed or closed after this operation.
val schedule_iobuf_consume : t -> ?len:int -> ([> Core.read ], Core.Iobuf.seek) Core.Iobuf.t -> unit Async_unix__.Import.Deferred.t
schedule_iobuf_consume
is likeschedule_iobuf_peek
, and additionally advances the iobuf beyond the portion that has been written. Until the result is determined, it is not safe to assume whether the iobuf has been advanced yet or not.
module Destroy_or_keep : sig ... end
val schedule_iovec : ?destroy_or_keep:Destroy_or_keep.t -> t -> Core.Bigstring.t Core.Unix.IOVec.t -> unit
schedule_iovec t iovec
schedules a write of I/O-vectoriovec
. It is not safe to change the bigstrings underlying the I/O-vector until the writer has been successfully flushed or closed after this operation.
val schedule_iovecs : t -> Core.Bigstring.t Core.Unix.IOVec.t Core.Queue.t -> unit
schedule_iovecs t iovecs
likeschedule_iovec
, but takes a whole queueiovecs
of I/O-vectors as argument. The queue is guaranteed to be empty when this function returns and can be modified. It is not safe to change the bigstrings underlying the I/O-vectors until the writer has been successfully flushed or closed after this operation.
val flushed : t -> unit Async_unix__.Import.Deferred.t
flushed t
returns a deferred that will become determined when all prior writes complete (i.e. thewrite()
system call returns). If a prior write fails, then the deferred will never become determined.It is OK to call
flushed t
aftert
has been closed.
val flushed_time : t -> Core.Time.t Async_unix__.Import.Deferred.t
val flushed_time_ns : t -> Core.Time_ns.t Async_unix__.Import.Deferred.t
val fsync : t -> unit Async_unix__.Import.Deferred.t
val fdatasync : t -> unit Async_unix__.Import.Deferred.t
val send : t -> string -> unit
send
writes a string to the writer that can be read back usingReader.recv
.
val monitor : t -> Async_unix__.Import.Monitor.t
monitor t
returns the writer's monitor.
val close : ?force_close:unit Async_unix__.Import.Deferred.t -> t -> unit Async_unix__.Import.Deferred.t
close ?force_close t
waits for the writer to be flushed, and then callsUnix.close
on the underlying file descriptor.force_close
causes theUnix.close
to happen even if the flush hangs. By defaultforce_close
isDeferred.never ()
for files andafter (sec 5)
for other types of file descriptors (e.g., sockets). If the close is forced, data in the writer's buffer may not be written to the file descriptor. You can check this by callingbytes_to_write
afterclose
finishes.WARNING:
force_close
will not reliably stop any write that is in progress. If there are any in-flight system calls, it will wait for them to finish, which includeswritev
, which can legitimately block forever.close
will raise an exception if theUnix.close
on the underlying file descriptor fails.You must call
close
on a writer in order to close the underlying file descriptor. Not doing so will cause a file descriptor leak. It also will cause a space leak, because until the writer is closed, it is held on to in order to flush the writer on shutdown.It is an error to call other operations on
t
afterclose t
has been called, except that calls ofclose
subsequent to the original call toclose
will return the same deferred as the original call.close_started t
becomes determined as soon asclose
is called.close_finished t
becomes determined aftert
's underlying file descriptor has been closed, i.e., it is the same as the result ofclose
.close_finished
differs fromclose
in that it does not have the side effect of initiating a close.is_closed t
returnstrue
iffclose t
has been called.is_open t
isnot (is_closed t)
with_close t ~f
runsf ()
, and closest
afterf
finishes or raises.
val close_started : t -> unit Async_unix__.Import.Deferred.t
val close_finished : t -> unit Async_unix__.Import.Deferred.t
val is_closed : t -> bool
val is_open : t -> bool
val with_close : t -> f:(unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t
val can_write : t -> bool
can_write t
returnstrue
if calls towrite*
functions ont
are allowed. Ifis_open t
thencan_write t
. But one can haveis_closed t
andcan_write t
, during the time afterclose t
before closing has finished.
val is_stopped_permanently : t -> bool
Errors raised within the writer can stop the background job that flushes out the writer's buffers.
is_stopped_permanently
returnstrue
when the background job has stopped.stopped_permanently
becomes determined when the background job has stopped.
val stopped_permanently : t -> unit Async_unix__.Import.Deferred.t
val with_flushed_at_close : t -> flushed:(unit -> unit Async_unix__.Import.Deferred.t) -> f:(unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t
In addition to flushing its internal buffer prior to closing, a writer keeps track of producers that are feeding it data, so that when
Writer.close
is called, it does the following:- requests that the writer's producers flush their data to it
- flushes the writer's internal buffer
- calls
Unix.close
on the writer's underlying file descriptor
with_flushed_at_close t ~flushed ~f
callsf
and addsflushed
to the set of producers that should be flushed-at-close, for the duration off
.
val bytes_to_write : t -> int
bytes_to_write t
returns how many bytes have been requested to write but have not yet been written.
val bytes_written : t -> Core.Int63.t
bytes_written t
returns how many bytes have been written.
val bytes_received : t -> Core.Int63.t
bytes_received t
returns how many bytes have been received by the writer. As long as the writer is running,bytes_received = bytes_written + bytes_to_write
.
val with_file_atomic : ?temp_file:string -> ?perm:Core.Unix.file_perm -> ?fsync:bool -> ?time_source:[> Core.read ] Async_unix__.Import.Time_source.T1.t -> string -> f:(t -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t
val save : ?temp_file:string -> ?perm:Core.Unix.file_perm -> ?fsync:bool -> string -> contents:string -> unit Async_unix__.Import.Deferred.t
val save_lines : ?temp_file:string -> ?perm:Core.Unix.file_perm -> ?fsync:bool -> string -> string list -> unit Async_unix__.Import.Deferred.t
save_lines file lines
writes all lines inlines
tofile
, with each line followed by a newline.
val save_sexp : ?temp_file:string -> ?perm:Core.Unix.file_perm -> ?fsync:bool -> ?hum:bool -> string -> Core.Sexp.t -> unit Async_unix__.Import.Deferred.t
save_sexp t sexp
writessexp
tot
, followed by a newline. To read a file produced usingsave_sexp
, one would typically useReader.load_sexp
, which deals with the additional whitespace and works nicely with converting the sexp to a value.
val save_sexps : ?temp_file:string -> ?perm:Core.Unix.file_perm -> ?fsync:bool -> ?hum:bool -> string -> Core.Sexp.t list -> unit Async_unix__.Import.Deferred.t
save_sexps
works similarly tosave_sexp
, but saves a sequence of sexps instead, separated by newlines. There is a correspondingReader.load_sexps
for reading back in.
val save_bin_prot : ?temp_file:string -> ?perm:Core.Unix.file_perm -> ?fsync:bool -> string -> 'a Bin_prot.Type_class.writer -> 'a -> unit Async_unix__.Import.Deferred.t
save_bin_prot t bin_writer 'a
writes'a
tot
using its bin_writer, in the size-prefixed format, likewrite_bin_prot
. To read a file produced usingsave_bin_prot
, one would typically useReader.load_bin_prot
.
val transfer' : ?stop:unit Async_unix__.Import.Deferred.t -> ?max_num_values_per_read:int -> t -> 'a Async_unix__.Import.Pipe.Reader.t -> ('a Core.Queue.t -> unit Async_unix__.Import.Deferred.t) -> unit Async_unix__.Import.Deferred.t
transfer' t pipe_r f
repeatedly reads values frompipe_r
and feeds them tof
, which should in turn write them tot
. It provides pushback topipe_r
by not reading whent
cannot keep up with the data being pushed in.By default, each read from
pipe_r
reads all the values inpipe_r
. One can supplymax_num_values_per_read
to limit the number of values per read.The
transfer'
stops and the result becomes determined whenstop
becomes determined, whenpipe_r
reaches its EOF, whent
is closed, or whent
's consumer leaves. In the latter two cases,transfer'
closespipe_r
.transfer'
causesPipe.flushed
onpipe_r
's writer to ensure that the bytes have been flushed tot
before returning. It also waits onPipe.upstream_flushed
at shutdown.transfer t pipe_r f
is equivalent to:transfer' t pipe_r (fun q -> Queue.iter q ~f; return ())
val transfer : ?stop:unit Async_unix__.Import.Deferred.t -> ?max_num_values_per_read:int -> t -> 'a Async_unix__.Import.Pipe.Reader.t -> ('a -> unit) -> unit Async_unix__.Import.Deferred.t
val pipe : t -> string Async_unix__.Import.Pipe.Writer.t
pipe t
returns the writing end of a pipe attached tot
that pushes back whent
cannot keep up with the data being pushed in. Closing the pipe does not closet
.
val of_pipe : ?time_source:[> Core.read ] Async_unix__.Import.Time_source.T1.t -> Core.Info.t -> string Async_unix__.Import.Pipe.Writer.t -> (t * [ `Closed_and_flushed_downstream of unit Async_unix__.Import.Deferred.t ]) Async_unix__.Import.Deferred.t
of_pipe info pipe_w
returns a writert
such that data written tot
will appear onpipe_w
. If eithert
orpipe_w
are closed, the other is closed as well.of_pipe
is implemented by attachingt
to the write-end of a Unix pipe, and shuttling bytes from the read-end of the Unix pipe topipe_w
.
val behave_nicely_in_pipeline : ?writers:t list -> unit -> unit
behave_nicely_in_pipeline ~writers ()
causes the program to silently exit with status 0 if any of the consumers ofwriters
go away. It also sets the buffer age to unlimited, in case there is a human (e.g., usingless
) on the other side of the pipeline.
val set_synchronous_out_channel : t -> Core.Out_channel.t -> unit Async_unix__.Import.Deferred.t
set_synchronous_out_channel t out_channel
waits untilbyte_to_write t = 0
, and then mutatest
so that all future writes tot
synchronously callOut_channel.output*
functions to send data to the OS immediately.set_synchronous_out_channel
is used by expect tests to ensure that the interleaving between calls toCore.printf
(and similar IO functions) andAsync.printf
generates output with the same interleaving.set_synchronous_out_channel
is idempotent.
val using_synchronous_backing_out_channel : t -> bool
using_synchronous_backing_out_channel t = true
if writes tot
are being done synchronously, e.g., due toset_synchronous_out_channel
,set_synchronous_backing_out_channel
,use_synchronous_stdout_and_stderr
.
val clear_synchronous_out_channel : t -> unit
clear_synchronous_out_channel t
restorest
to its normal state, with the background writer asynchronously feeding data to the OS.clear_synchronous_out_channel
is idempotent.
val with_synchronous_out_channel : t -> Core.Out_channel.t -> f:(unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t
val use_synchronous_stdout_and_stderr : unit -> unit Async_unix__.Import.Deferred.t
use_synchronous_stdout_and_stderr ()
causes all subsequent writes to stdout and stderr to occur synchronously (after any pending writes have flushed).This ensures
printf
-family writes happen immediately, which avoids two common sources of confusion:- unexpected interleaving of
Core.printf
andAsync.printf
calls; and Async.printf
calls that don't get flushed before an application exits
The disadvantages are:
- this makes writes blocking, which can delay unrelated asynchronous jobs until the consumer stops pushing back; and
- the errors raised by write are different and it won't respect
behave_nicely_in_pipeline
anymore
- unexpected interleaving of
module Backing_out_channel : sig ... end
Backing_out_channel
generalizesOut_channel
to a narrow interface that can be used to collect strings, etc.
val set_synchronous_backing_out_channel : t -> Backing_out_channel.t -> unit Async_unix__.Import.Deferred.t
val with_synchronous_backing_out_channel : t -> Backing_out_channel.t -> f:(unit -> 'a Async_unix__.Import.Deferred.t) -> 'a Async_unix__.Import.Deferred.t