Module Async_udp
A grab-bag of performance-oriented, UDP-oriented network tools. These provide some convenience, but they are more complex than basic applications require.
Defaults are chosen for typical UDP applications. Buffering is via Iobuf
conventions, where a typical packet-handling loop iteration is read -> flip_lo
-> process -> reset
.
While these functions are oriented toward UDP, they work with any files that satisfy Fd.supports_nonblock
.
For zero-copy Bigstring.t
transfers, we must ensure no buffering between the receive loop and caller. So an interface like Tcp.connect
, with something like (Bigstring.t * Socket.Address.Inet.t) Pipe.Reader.t
, won't work. Instead, we use synchronous callbacks.
type write_buffer
= (Core.read_write, Core.Iobuf.seek) Core.Iobuf.t
val default_capacity : int
The default buffer capacity for UDP-oriented buffers is 1472, determined as the typical Ethernet MTU (1500 octets) less the typical UDP header length (28). Using buffers of this size, one avoids accidentally creating messages that will be dropped on send because they exceed the MTU, and can receive the largest corresponding UDP message.
While this number is merely typical and not guaranteed to work in all cases, defining it in one place makes it easy to share and change. For example, another MTU in common use is 9000 for Jumbo frames, so the value of
default_capacity
might change to 8972 in the future.
module Config : sig ... end
A typical receive loop implicitly calls
Iobuf.flip_lo
before calling its callback to prepare a packet buffer for reading by the callback andIobuf.reset
afterward to prepare for the next iteration.
val sendto_sync : unit -> (Async.Fd.t -> ([> Core.read ], Core.Iobuf.seek) Core.Iobuf.t -> Async.Socket.Address.Inet.t -> Async.Unix.Syscall_result.Unit.t) Core.Or_error.t
sendto_sync sock buf addr
does not try again ifsock
is not ready to write. Instead, it returnsEWOULDBLOCK
immediately.Short writes are distinguished by
buf
not being empty afterward.See also
Iobuf
.sendto_nonblocking_no_sigpipe andBigstring
.sendto_nonblocking_no_sigpipe.- raises Failure
on internal errors but return
Unix.error
viaUnix.Syscall_result.Unit.t
rather than raisingUnix_error
.
val send_sync : unit -> (Async.Fd.t -> ([> Core.read ], Core.Iobuf.seek) Core.Iobuf.t -> Async.Unix.Syscall_result.Unit.t) Core.Or_error.t
send_sync sock buf
has identical semantics tosendto_sync
, but is intended for connected UDP sockets (and therefore does not require a "to" address).See also
Iobuf
.send_nonblocking_no_sigpipe andBigstring
.send_nonblocking_no_sigpipe.- raises Failure
on internal errors but return
Unix.error
viaUnix.Syscall_result.Unit.t
rather than raisingUnix_error
.
val sendto : unit -> (Async.Fd.t -> ([> Core.read ], Core.Iobuf.seek) Core.Iobuf.t -> Async.Socket.Address.Inet.t -> unit Async.Deferred.t) Core.Or_error.t
sendto sock buf addr
retries ifsock
is not ready to write.- raises Unix_error
in the case of Unix output errors and
Failure
on internal errors.
val send : unit -> (Async.Fd.t -> ([> Core.read ], Core.Iobuf.seek) Core.Iobuf.t -> unit Async.Deferred.t) Core.Or_error.t
send sock buf
retries ifsock
is not ready to write.- raises Unix_error
in the case of Unix output errors and
Failure
on internal errors.
val bind : ?ifname:string -> ?source:Async.Unix.Inet_addr.t -> Async.Socket.Address.Inet.t -> ([ `Bound ], Async.Socket.Address.Inet.t) Async.Socket.t
bind ?ifname ?source address
creates a socket bound to address, and ifaddress
is multicast address,- joins the multicast group.
- if
ifname
is specified, joins the multicast group on ifname. - if
source
is specified, joins the multicast group with source.
val bind_any : unit -> ([ `Bound ], Async.Socket.Address.Inet.t) Async.Socket.t
module Loop_result : sig ... end
val recvfrom_loop : ?config:Config.t -> Async.Fd.t -> (write_buffer -> Async.Socket.Address.Inet.t -> unit) -> Loop_result.t Async.Deferred.t
Loops, including
recvfrom_loop
, terminate normally when the socket is closed.
val recvfrom_loop_with_buffer_replacement : ?config:Config.t -> Async.Fd.t -> (write_buffer -> Async.Socket.Address.Inet.t -> write_buffer) -> Loop_result.t Async.Deferred.t
recvfrom_loop_with_buffer_replacement callback
callscallback
synchronously on each message received.callback
returns the packet buffer for subsequent iterations, so it can replace the initial packet buffer when necessary. This enables immediate buffer reuse in the common case and fallback to allocation if we want to save the packet buffer for asynchronous processing.
val read_loop : ?config:Config.t -> Async.Fd.t -> (write_buffer -> unit) -> Loop_result.t Async.Deferred.t
val read_loop_with_buffer_replacement : ?config:Config.t -> Async.Fd.t -> (write_buffer -> write_buffer) -> Loop_result.t Async.Deferred.t
val recvmmsg_loop : (?config:Config.t -> ?max_count:int -> ?on_wouldblock:(unit -> unit) -> Async.Fd.t -> (write_buffer array -> count:int -> unit) -> Loop_result.t Async.Deferred.t) Core.Or_error.t
recvmmsg_loop ~socket callback
iteratively receives up tomax_count
packets at a time onsocket
and passes them tocallback
. Each packet is up toIobuf.capacity
bytes.callback bufs ~count
processescount
packets synchronously.Config.init config
is used as a prototype forbufs
and as one of the elements.