module Id : Core.Unique_id
module Line_ending : sig ... end
include sig ... end
val sexp_of_t : t ‑> Sexplib.Sexp.t
val stdout : t Core.Lazy.t
stdout
and stderr
are writers for file descriptors 1 and 2. They are lazy because
we don't want to create them in all programs that happen to link with Async.
When either stdout
or stderr
is created, they both are created. Furthermore, if
they point to the same inode, then they will be the same writer to Fd.stdout
. This
can be confusing, because fd (force stderr)
will be Fd.stdout
, not Fd.stderr
.
And subsequent modifications of Fd.stderr
will have no effect on Writer.stderr
.
Unfortunately, the sharing is necessary because Async uses OS threads to do write() syscalls using the writer buffer. When calling a program that redirects stdout and stderr to the same file, as in:
foo.exe >/tmp/z.file 2>&1
if Writer.stdout
and Writer.stderr
weren't the same writer, then they could have
threads simultaneously writing to the same file, which could easily cause data loss.
val stderr : t Core.Lazy.t
include sig ... end
val buffer_age_limit_of_sexp : Sexplib.Sexp.t ‑> buffer_age_limit
val __buffer_age_limit_of_sexp__ : Sexplib.Sexp.t ‑> buffer_age_limit
val sexp_of_buffer_age_limit : buffer_age_limit ‑> Sexplib.Sexp.t
val bin_buffer_age_limit : buffer_age_limit Bin_prot.Type_class.t
val bin_read_buffer_age_limit : buffer_age_limit Bin_prot.Read.reader
val __bin_read_buffer_age_limit__ : (int ‑> buffer_age_limit) Bin_prot.Read.reader
val bin_reader_buffer_age_limit : buffer_age_limit Bin_prot.Type_class.reader
val bin_size_buffer_age_limit : buffer_age_limit Bin_prot.Size.sizer
val bin_write_buffer_age_limit : buffer_age_limit Bin_prot.Write.writer
val bin_writer_buffer_age_limit : buffer_age_limit Bin_prot.Type_class.writer
val bin_shape_buffer_age_limit : Bin_prot.Shape.t
val create : ?buf_len:int ‑> ?syscall:[ `Per_cycle | `Periodic of Core.Time.Span.t ] ‑> ?buffer_age_limit:buffer_age_limit ‑> ?raise_when_consumer_leaves:bool ‑> ?line_ending:Line_ending.t ‑> Fd.t ‑> t
create ?buf_len ?syscall ?buffer_age_limit fd
creates a new writer. The file
descriptor fd should not be in use for writing by anything else.
By default, a write system call occurs at the end of a cycle in which bytes were written. One can supply ~syscall:(`Periodic span) to get better performance. This batches writes together, doing the write system call periodically according to the supplied span.
A writer can asynchronously fail if the underlying write syscall returns an error, e.g. EBADF, EPIPE, ECONNRESET, ....
buffer_age_limit
specifies how backed up you can get before raising an exception.
The default is `Unlimited
for files, and 2 minutes for other kinds of file
descriptors. You can supply `Unlimited
to turn off buffer-age checks.
raise_when_consumer_leaves
specifies whether the writer should raise an exception
when the consumer receiving bytes from the writer leaves, i.e. in Unix, the write
syscall returns EPIPE or ECONNRESET. If not raise_when_consumer_leaves
, then the
writer will silently drop all writes after the consumer leaves, and the writer will
eventually fail with a writer-buffer-older-than error if the application remains open
long enough.
line_ending
determines how newline
and write_line
terminate lines by default.
If line_ending = Unix
then end of line is "\n"
, if line_ending = Dos
then end of
line is "\r\n"
. Note that line_ending = Dos
is not equivalent to opening the file
in text mode because any "\n" characters being printed by other means (e.g. write
"\n"
) are still written verbatim (in Unix style).
val raise_when_consumer_leaves : t ‑> bool
val set_raise_when_consumer_leaves : t ‑> bool ‑> unit
set_raise_when_consumer_leaves t bool
sets the raise_when_consumer_leaves
flag of
t
, which determies how t
responds to a write system call raising EPIPE and
ECONNRESET (see create
).
val set_buffer_age_limit : t ‑> buffer_age_limit ‑> unit
set_buffer_age_limit t buffer_age_limit
replaces the existing buffer age limit with
the new one. This is useful for stdout and stderr, which are lazily created in a
context that does not allow applications to specify buffer_age_limit
.
val consumer_left : t ‑> unit Import.Deferred.t
consumer_left t
returns a deferred that becomes determined when t
attempts to
write to a pipe that broke because the consumer on the other side left.
val of_out_channel : Core.Out_channel.t ‑> Fd.Kind.t ‑> t
val open_file : ?append:bool ‑> ?buf_len:int ‑> ?perm:int ‑> ?line_ending:Line_ending.t ‑> string ‑> t Import.Deferred.t
open_file file
opens file
for writing and returns a writer for it. It uses
Unix_syscalls.openfile
to open the file.
val with_file : ?perm:int ‑> ?append:bool ‑> ?exclusive:bool ‑> ?line_ending:Line_ending.t ‑> string ‑> f:(t ‑> 'a Import.Deferred.t) ‑> 'a Import.Deferred.t
with_file ~file f
opens file
for writing, creates a writer t
, and runs f t
to
obtain a deferred d
. When d
becomes determined, the writer is closed. When the
close completes, the result of with_file
becomes determined with the value of d
.
There is no need to call Writer.flushed
to ensure that with_file
waits for the
writer to be flushed before closing it. Writer.close
will already wait for the
flush.
val set_fd : t ‑> Fd.t ‑> unit Import.Deferred.t
set_fd t fd
sets the fd used by t
for its underlying system calls. It first waits
until everything being sent to the current fd is flushed. Of course, one must
understand how the writer works and what one is doing to use this.
val write_gen : ?pos:int ‑> ?len:int ‑> t ‑> 'a ‑> blit_to_bigstring:('a, Core.Bigstring.t) Core.Blit.blit ‑> length:('a ‑> int) ‑> unit
write_gen t a
writes a
to writer t
, with length
specifying the number of bytes
needed and blit_to_bigstring
blitting a
directly into the t
's buffer. If one
has a type that has length
and blit_to_bigstring
functions, like:
module A : sig
type t
val length : t -> int
val blit_to_bigstring : (t, Bigstring.t) Blit.blit
end
then one can use write_gen
to implement a custom analog of Writer.write
, like:
module Write_a : sig
val write : ?pos:int -> ?len:int -> A.t -> Writer.t -> unit
end = struct
let write ?pos ?len a writer =
Writer.write_gen
~length:A.length
~blit_to_bigstring:A.blit_to_bigstring
?pos ?len writer a
end
In some cases it may be difficult to write only part of a value:
module B : sig
type t
val length : t -> int
val blit_to_bigstring : t -> Bigstring.t -> pos:int -> unit
end
In these cases, use write_gen_whole
instead. It never requires writing only part of
a value, although it is potentially less space-efficient. It may waste portions of
previously-allocated write buffers if they are too small.
module Write_b : sig
val write : B.t -> Writer.t -> unit
end = struct
let write b writer =
Writer.write_gen_whole
~length:B.length
~blit_to_bigstring:B.blit_to_bigstring
writer b
end
Note: write_gen
and write_gen_whole
give you access to the writer's internal
buffer. You should not capture it; doing so might lead to errors of the segfault
kind.
val write_gen_whole : t ‑> 'a ‑> blit_to_bigstring:('a ‑> Core.Bigstring.t ‑> pos:int ‑> unit) ‑> length:('a ‑> int) ‑> unit
val write_direct : t ‑> f:(Core.Bigstring.t ‑> pos:int ‑> len:int ‑> 'a * int) ‑> 'a option
write_direct t ~f
gives t
's internal buffer to f
. pos
and len
define the
portion of the buffer that can be filled. f
must return a pair (x, written)
where
written
is the number of bytes written to the buffer at pos
. write_direct
raises if written < 0 || written > len
. write_direct
returns Some x
, or None
if the writer is stopped. By using write_direct
only, one can ensure that the
writer's internal buffer never grows. Look at the write_direct
expect tests for an
example of how this can be used to construct a write_string
like function that never
grows the internal buffer.
val write : ?pos:int ‑> ?len:int ‑> t ‑> string ‑> unit
write ?pos ?len t s
adds a job to the writer's queue of pending writes. The
contents of the string are copied to an internal buffer before write
returns, so
clients can do whatever they want with s
after that.
val write_bigstring : ?pos:int ‑> ?len:int ‑> t ‑> Core.Bigstring.t ‑> unit
val write_iobuf : ?pos:int ‑> ?len:int ‑> t ‑> ([> Core.read ], _) Core.Iobuf.t ‑> unit
val write_substring : t ‑> Core.Substring.t ‑> unit
val write_bigsubstring : t ‑> Core.Bigsubstring.t ‑> unit
val writef : t ‑> ('a, unit, string, unit) Core.format4 ‑> 'a
val to_formatter : t ‑> Format.formatter
to_formatter t
val newline : ?line_ending:Line_ending.t ‑> t ‑> unit
newline t
writes the end-of-line terminator. line_ending
can override t
's
line_ending
.
val write_line : ?line_ending:Line_ending.t ‑> t ‑> string ‑> unit
write_line t s ?line_ending
is write t s; newline t ?line_ending
.
val write_byte : t ‑> int ‑> unit
write_byte t i
writes one 8-bit integer (as the single character with that code).
The given integer is taken modulo 256.
module Terminate_with : sig ... end
val write_sexp : ?hum:bool ‑> ?terminate_with:Terminate_with.t ‑> t ‑> Core.Sexp.t ‑> unit
write_sexp t sexp
writes to t
the string representation of sexp
, possibly
followed by a terminating character as per Terminate_with
. With
~terminate_with:Newline
, the terminating character is a newline. With
~terminate_with:Space_if_needed
, if a space is needed to ensure that the sexp reader
knows that it has reached the end of the sexp, then the terminating character will be
a space; otherwise, no terminating character is added. A terminating space is needed
if the string representation doesn't end in ')'
or '"'
.
val write_bin_prot : t ‑> 'a Bin_prot.Type_class.writer ‑> 'a ‑> unit
write_bin_prot
writes out a value using its bin_prot sizer/writer pair. The format
is the "size-prefixed binary protocol", in which the length of the data is written
before the data itself. This is the format that Reader.read_bin_prot reads.
val write_bin_prot_no_size_header : t ‑> size:int ‑> 'a Bin_prot.Write.writer ‑> 'a ‑> unit
Writes out a value using its bin_prot writer. Unlike write_bin_prot
, this doesn't
prefix the output with the size of the bin_prot blob. size
is the expected size.
This function will raise if the bin_prot writer writes an amount other than size
bytes.
val write_marshal : t ‑> flags:Marshal.extern_flags list ‑> _ ‑> unit
Serialize data using marshal and write it to the writer
Unlike the write_
functions, all functions starting with schedule_
require
flushing or closing of the writer after returning before it is safe to modify the
bigstrings which were directly or indirectly passed to these functions. The reason is
that these bigstrings will be read from directly when writing; their contents is not
copied to internal buffers.
This is important if users need to send the same large data string to a huge number of clients simultaneously (e.g. on a cluster), because these functions then avoid needlessly exhausting memory by sharing the data.
val schedule_bigstring : t ‑> ?pos:int ‑> ?len:int ‑> Core.Bigstring.t ‑> unit
schedule_bigstring t bstr
schedules a write of bigstring bstr
. It is not safe to
change the bigstring until the writer has been successfully flushed or closed after
this operation.
val schedule_bigsubstring : t ‑> Core.Bigsubstring.t ‑> unit
val schedule_iobuf_peek : t ‑> ?pos:int ‑> ?len:int ‑> ([> Core.read ], _) Core.Iobuf.t ‑> unit
schedule_iobuf_peek
is like schedule_bigstring
, but for an iobuf. It is not safe
to change the iobuf until the writer has been successfully flushed or closed after
this operation.
val schedule_iobuf_consume : t ‑> ?len:int ‑> ([> Core.read ], Core.Iobuf.seek) Core.Iobuf.t ‑> unit Import.Deferred.t
schedule_iobuf_consume
is like schedule_iobuf_peek
, and additionally advances the
iobuf beyond the portion that has been written. Until the result is determined, it is
not safe to assume whether the iobuf has been advanced yet or not.
val schedule_iovec : t ‑> Core.Bigstring.t Core.Unix.IOVec.t ‑> unit
schedule_iovec t iovec
schedules a write of I/O-vector iovec
. It is not safe to
change the bigstrings underlying the I/O-vector until the writer has been successfully
flushed or closed after this operation.
val schedule_iovecs : t ‑> Core.Bigstring.t Core.Unix.IOVec.t Core.Queue.t ‑> unit
schedule_iovecs t iovecs
like schedule_iovec, but takes a whole queue iovecs
of
I/O-vectors as argument. The queue is guaranteed to be empty when this function
returns and can be modified. It is not safe to change the bigstrings underlying the
I/O-vectors until the writer has been successfully flushed or closed after this
operation.
val flushed : t ‑> unit Import.Deferred.t
flushed t
returns a deferred that will become determined when all prior writes
complete (i.e. the write()
system call returns). If a prior write fails, then the
deferred will never become determined.
It is OK to call flushed t
after t
has been closed.
val flushed_time : t ‑> Core.Time.t Import.Deferred.t
val flushed_time_ns : t ‑> Core.Time_ns.t Import.Deferred.t
val fsync : t ‑> unit Import.Deferred.t
val fdatasync : t ‑> unit Import.Deferred.t
val send : t ‑> string ‑> unit
send t s
writes a string to the channel that can be read back
using Reader.recv
val close : ?force_close:unit Import.Deferred.t ‑> t ‑> unit Import.Deferred.t
close ?force_close t
waits for the writer to be flushed, and then calls Unix.close
on the underlying file descriptor. force_close
causes the Unix.close
to happen
even if the flush hangs. By default force_close
is Deferred.never ()
for files
and after (sec 5)
for other types of file descriptors (e.g. sockets). If the close
is forced, data in the writer's buffer may not be written to the file descriptor. You
can check this by calling bytes_to_write
after close
finishes.
WARNING: force_close
will not reliably stop any write that is in progress.
If there are any in-flight system calls, it will wait for them to finish, which
includes writev
, which can legitimately block forever.
close
will raise an exception if the Unix.close
on the underlying file descriptor
fails.
It is required to call close
on a writer in order to close the underlying file
descriptor. Not doing so will cause a file descriptor leak. It also will cause a
space leak, because until the writer is closed, it is held on to in order to flush the
writer on shutdown.
It is an error to call other operations on t
after close t
has been called, except
that calls of close
subsequent to the original call to close
will return the same
deferred as the original call.
close_started t
becomes determined as soon as close
is called.
close_finished t
becomes determined after t
's underlying file descriptor has been
closed, i.e. it is the same as the result of close
. close_finished
differs from
close
in that it does not have the side effect of initiating a close.
is_closed t
returns true
iff close t
has been called.
is_open t
is not (is_closed t)
with_close t ~f
runs f ()
, and closes t
after f
finishes or raises.
val close_started : t ‑> unit Import.Deferred.t
val close_finished : t ‑> unit Import.Deferred.t
val is_closed : t ‑> bool
val is_open : t ‑> bool
val with_close : t ‑> f:(unit ‑> 'a Import.Deferred.t) ‑> 'a Import.Deferred.t
val can_write : t ‑> bool
can_write t
returns true
if calls to write*
functions on t
are allowed. If
is_open t
then can_write t
. But one can have is_closed t
and can_write t
,
during the time after close t
before closing has finished.
val is_stopped_permanently : t ‑> bool
Errors raised within the writer can stop the background job that flushes out the
writer's buffers. is_stopped_permanently
returns true
when the background job has
stopped. stopped_permanently
becomes determined when the background job has
stopped.
val stopped_permanently : t ‑> unit Import.Deferred.t
val with_flushed_at_close : t ‑> flushed:(unit ‑> unit Import.Deferred.t) ‑> f:(unit ‑> 'a Import.Deferred.t) ‑> 'a Import.Deferred.t
In addition to flushing its internal buffer prior to closing, a writer keeps track of
producers that are feeding it data, so that when Writer.close
is called, it does the
following:
Unix.close
on the writer's underlying file descriptorwith_flushed_at_close t ~flushed ~f
calls f
and adds flushed
to the set of
producers that should be flushed-at-close, for the duration of f
.
val bytes_to_write : t ‑> int
bytes_to_write t
returns how many bytes have been requested to write but have not
yet been written.
val bytes_received : t ‑> Core.Int63.t
bytes_received t
returns how many bytes have been received by the writer. As long
as the writer is running, bytes_received = bytes_written + bytes_to_write
.
with_file_atomic ?temp_file ?perm ?fsync file ~f
creates a writer to a temp file,
feeds that writer to f
, and when the result of f
becomes determined, atomically
moves (i.e. uses Unix.rename
) the temp file to file
. If file
currently exists,
it will be replaced, even if it is read only. The temp file will be file
(or
temp_file
if supplied) suffixed by a unique random sequence of six characters. The
temp file may need to be removed in case of a crash so it may be prudent to choose a
temp file that can be easily found by cleanup tools.
If fsync
is true
, the temp file will be flushed to disk before it takes the place
of the target file, thus guaranteeing that the target file will always be in a sound
state, even after a machine crash. Since synchronization is extremely slow, this is
not the default. Think carefully about the event of machine crashes and whether you
may need this option!
We intend for with_file_atomic
to preserve the behavior of the open
system call,
so if file
does not exist, we will apply the umask to perm
. If file
does exist,
perm
will default to the file's current permissions rather than 0o666.
save
is a special case of with_file_atomic
that atomically writes the given
string to the specified file.
save_sexp
is a special case of with_file_atomic
that atomically writes the
given sexp to the specified file.
val with_file_atomic : ?temp_file:string ‑> ?perm:Core.Unix.file_perm ‑> ?fsync:bool ‑> string ‑> f:(t ‑> 'a Import.Deferred.t) ‑> 'a Import.Deferred.t
val save : ?temp_file:string ‑> ?perm:Core.Unix.file_perm ‑> ?fsync:bool ‑> string ‑> contents:string ‑> unit Import.Deferred.t
val save_lines : ?temp_file:string ‑> ?perm:Core.Unix.file_perm ‑> ?fsync:bool ‑> string ‑> string list ‑> unit Import.Deferred.t
save_lines file lines
writes all lines in lines
to file
, with each line followed
by a newline.
val save_sexp : ?temp_file:string ‑> ?perm:Core.Unix.file_perm ‑> ?fsync:bool ‑> ?hum:bool ‑> string ‑> Core.Sexp.t ‑> unit Import.Deferred.t
save_sexp t sexp
writes sexp
to t
, followed by a newline. To read a file
produced using save_sexp
, one would typically use Reader.load_sexp
, which deals
with the additional whitespace and works nicely with converting the sexp to a
value.
val save_sexps : ?temp_file:string ‑> ?perm:Core.Unix.file_perm ‑> ?fsync:bool ‑> ?hum:bool ‑> string ‑> Core.Sexp.t list ‑> unit Import.Deferred.t
save_sexps
works similarly to save_sexp
, but saves a sequence of sexps instead,
separated by newlines. There is a corresponding Reader.load_sexps
for reading back
in.
val save_bin_prot : ?temp_file:string ‑> ?perm:Core.Unix.file_perm ‑> ?fsync:bool ‑> string ‑> 'a Bin_prot.Type_class.writer ‑> 'a ‑> unit Import.Deferred.t
save_bin_prot t bin_writer 'a
writes 'a
to t
using its bin_writer, in the
size-prefixed format, like write_bin_prot
. To read a file produced using
save_bin_prot
, one would typically use Reader.load_bin_prot
.
val transfer' : ?stop:unit Import.Deferred.t ‑> ?max_num_values_per_read:int ‑> t ‑> 'a Import.Pipe.Reader.t ‑> ('a Core.Queue.t ‑> unit Import.Deferred.t) ‑> unit Import.Deferred.t
transfer' t pipe_r f
repeatedly reads values from pipe_r
and feeds them to f
,
which should in turn write them to t
. It provides pushback to pipe_r
by not
reading when t
cannot keep up with the data being pushed in.
By default, each read from pipe_r
reads all the values in pipe_r
. One can supply
max_num_values_per_read
to limit the number of values per read.
The transfer'
stops and the result becomes determined when stop
becomes
determined, when pipe_r
reaches its EOF, when t
is closed, or when t
's consumer
leaves. In the latter two cases, transfer'
closes pipe_r
.
transfer'
causes Pipe.flushed
on pipe_r
's writer to ensure that the bytes have
been flushed to t
before returning. It also waits on Pipe.upstream_flushed
at
shutdown.
transfer t pipe_r f
is equivalent to:
transfer' t pipe_r (fun q -> Queue.iter q ~f; return ())
val transfer : ?stop:unit Import.Deferred.t ‑> ?max_num_values_per_read:int ‑> t ‑> 'a Import.Pipe.Reader.t ‑> ('a ‑> unit) ‑> unit Import.Deferred.t
val pipe : t ‑> string Import.Pipe.Writer.t
pipe t
returns the writing end of a pipe attached to t
that pushes back when t
cannot keep up with the data being pushed in. Closing the pipe does not close t
.
val of_pipe : Core.Info.t ‑> string Import.Pipe.Writer.t ‑> (t * [ `Closed_and_flushed_downstream of unit Import.Deferred.t ]) Import.Deferred.t
of_pipe info pipe_w
returns a writer t
such that data written to t
will appear
on pipe_w
. If either t
or pipe_w
are closed, the other is closed as well.
of_pipe
is implemented by attaching t
to the write-end of a Unix pipe, and
shuttling bytes from the read-end of the Unix pipe to pipe_w
.
val behave_nicely_in_pipeline : ?writers:t list ‑> unit ‑> unit
behave_nicely_in_pipeline ~writers ()
causes the program to silently exit status
zero if any of the consumers of writers
go away. It also sets the buffer age to
unlimited, in case there is a human (e.g. using less
) on the other side of the
pipeline.